[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354259#comment-16354259
 ] 

Nick Travers commented on KAFKA-4669:
-------------------------------------

Chiming in again to note that we're still running into this issue 
intermittently. The failure mode is the same, with a BufferUnderflowException 
and stack trace similar to what I posted above.

For some additional context, when this occurs it ultimately leads to a JVM that 
cannot exit as it is waiting on a latch that will never be closed. Here's the 
hung thread
{code:java}
"async-message-sender-0" #120 daemon prio=5 os_prio=0 tid=0x00007f30b4003000 
nid=0x195a1 waiting on condition [0x00007f3105ce1000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@9/Native Method)
        - parking to wait for  <0x00000007852b1b68> (a 
java.util.concurrent.CountDownLatch$Sync)
        at 
java.util.concurrent.locks.LockSupport.park(java.base@9/LockSupport.java:194)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@9/AbstractQueuedSynchronizer.java:871)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1024)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1331)
        at 
java.util.concurrent.CountDownLatch.await(java.base@9/CountDownLatch.java:232)
        at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
        at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
        at 
com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
        - locked <0x0000000728c71998> (a 
com.squareup.kafka.ng.producer.KafkaProducer)
        at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
        at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@9/ThreadPoolExecutor.java:1167)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@9/ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(java.base@9/Thread.java:844)
{code}
[Here is the 
latch|https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java#L34]
 that is still open in the ProduceRequestResult. I assume that the network 
thread is responsible for closing that, but if that thread crashes for whatever 
reason, it never gets a chance to callCountDownLatch#countDown.

Arguably, we should probably be using a combination of daemon threads, and the 
timed version of Future#get, but it _feels_ like something that could be fixed 
in the producer client, even if it's just for the sake of ensuring that failed 
ProduceRequestResults can be GC'd eventually, which can't happen if another 
thread is hung waiting on the latch.

cc: [~rsivaram] [~hachikuji]

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>            Assignee: Rajini Sivaram
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>       at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>       at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>       at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>       at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>       at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to