[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Ju updated KAFKA-4669:
----------------------------
    Description: 
There is no try catch in NetworkClient.handleCompletedReceives.  If an 
exception is thrown after inFlightRequests.completeNext(source), then the 
corresponding RecordBatch's done will never get called, and KafkaProducer.flush 
will hang on this RecordBatch.

I've checked 0.10 code and think this bug does exist in 0.10 versions.

A real case.  First a correlateId not match exception happens:
13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
(org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught error 
in kafka producer I/O thread: 
java.lang.IllegalStateException: Correlation id for response (703766) does not 
match request (703764)
        at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:745)

Then jstack shows the thread is hanging on:
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
        at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)


client code 

  was:
There is no try catch in NetworkClient.handleCompletedReceives.  If an 
exception is thrown after inFlightRequests.completeNext(source), then the 
corresponding RecordBatch's done will never get called, and KafkaProducer.flush 
will hang on this RecordBatch.

I've checked 0.10 code and think this bug does exist in 0.10 versions.


> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>       at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>       at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>       at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>       at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>       at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to