[
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14284963#comment-14284963
]
Aditya A Auradkar commented on KAFKA-1886:
------------------------------------------
If interested, I hacked an existing test for this.
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
TestUtils.createTopic(zkClient, newTopic, numPartitions = 1,
replicationFactor = 1, servers = servers)
val thread = new Thread {
override def run {
System.out.println("Starting the fetch")
val start = System.currentTimeMillis()
try
{
val fetchResponse = consumer.fetch(new
FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0,
10000).build())
}
catch {
case e: Throwable =>{
val end = System.currentTimeMillis()
System.out.println("Caught exception" + e + ". Took " + (end -
start));
System.out.println("Fetch interrupted " +
Thread.currentThread().isInterrupted)
}
}
}
}
thread.start()
Thread.sleep(1000)
thread.interrupt()
thread.join()
System.out.println("Ending test")
}
> SimpleConsumer swallowing ClosedByInterruptException
> ----------------------------------------------------
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Reporter: Aditya A Auradkar
> Assignee: Jun Rao
>
> This issue was originally reported by a Samza developer. I've included an
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches
> Throwable in its sendRequest method [2]. I'm wondering: if
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If
> the send succeeds on the retry, I think that the ClosedByInterruptException
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1]
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2]
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)