[ https://issues.apache.org/jira/browse/KAFKA-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-5073: --------------------------------- Resolution: Fixed Fix Version/s: 0.11.0.0 Status: Resolved (was: Patch Available) Issue resolved by pull request 2856 [https://github.com/apache/kafka/pull/2856] > Kafka Streams stuck rebalancing after exception thrown in rebalance listener > ---------------------------------------------------------------------------- > > Key: KAFKA-5073 > URL: https://issues.apache.org/jira/browse/KAFKA-5073 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.0 > Reporter: Xavier Léauté > Assignee: Matthias J. Sax > Fix For: 0.11.0.0 > > > An exception thrown in the Steams rebalance listener will cause the Kafka > consumer coordinator to log an error, but the streams app will not bubble the > exception up to the uncaught exception handler. > This will leave the app stuck in rebalancing state if for instance an > exception is thrown by the consumer during state restore. > Here is an example log that shows the error when the consumer throws a CRC > error during state restore. > {code} > [2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group XXXXXXX failed on partition assignment > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269) > org.apache.kafka.common.KafkaException: Record batch for partition > _my_topic-0 at offset 42 is invalid, cause: Record is corrupt (stored crc = > 1982353474, computed crc = 1572524932) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)