[ 
https://issues.apache.org/jira/browse/KAFKA-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15969595#comment-15969595
 ] 

Matthias J. Sax commented on KAFKA-5073:
----------------------------------------

Thanks for pointing out [~ijuma]. I just double checked: the PR was not just 
cherry-picked but it's a slightly different PR and the bug was not introduced 
to {{0.10.2}} branch. Cf. {{0.10.2}} PR of KAFKA-5053: 
https://github.com/apache/kafka/pull/2774/files#diff-045aeaddb4232a85a8560186b4901e69R588

> Kafka Streams stuck rebalancing after exception thrown in rebalance listener
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-5073
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5073
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Xavier Léauté
>            Assignee: Matthias J. Sax
>
> An exception thrown in the Steams rebalance listener will cause the Kafka 
> consumer coordinator to log an error, but the streams app will not bubble the 
> exception up to the uncaught exception handler.
> This will leave the app stuck in rebalancing state if for instance an 
> exception is thrown by the consumer during state restore.
> Here is an example log that shows the error when the consumer throws a CRC 
> error during state restore.
> {code}
> [2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group XXXXXXX failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
> org.apache.kafka.common.KafkaException: Record batch for partition 
> _my_topic-0 at offset 42 is invalid, cause: Record is corrupt (stored crc = 
> 1982353474, computed crc = 1572524932)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to