[ https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15143135#comment-15143135 ]
ASF GitHub Bot commented on FLINK-3368: --------------------------------------- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/1623 [FLINK-3368][Kafka 0.8] Handle leader changes in Kafka Consumer. Please see the JIRA for an explanation of the problems. tl;dr: The Kafka 0.8 consumer now handles broker failures internally, without relying on Flink's checkpointing / job recovery. The test case which was previously relying on a topology restart is now expecting the consumers to handle the failure. I also tested this change on a 7 nodes cluster. The job was running for 30 minutes, surviving 4 broker shutdowns. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1623.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1623 ---- commit 5e3bc92a0e6b1250a8ae0f8d898296586851dea1 Author: Robert Metzger <rmetz...@apache.org> Date: 2016-02-09T11:04:45Z [FLINK-3368][Kafka 0.8] Handle leader changes in Kafka Consumer. ---- > Kafka 0.8 consumer fails to recover from broker shutdowns > --------------------------------------------------------- > > Key: FLINK-3368 > URL: https://issues.apache.org/jira/browse/FLINK-3368 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.0 > Reporter: Robert Metzger > Assignee: Robert Metzger > Priority: Blocker > > It seems that the Kafka Consumer (0.8) fails to restart a job after it failed > due to a Kafka broker shutdown. > {code} > java.lang.Exception: Unable to get last offset for partitions [FetchPartition > {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, > partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, > offset=-915623761776}, FetchPartition {topic=d, partition=13, > offset=-915623761776}, FetchPartition {topic=e, partition=13, > offset=-915623761776}, FetchPartition {topic=f, partition=13, > offset=-915623761776}, FetchPartition {topic=g, partition=13, > offset=-915623761776}]. > Exception for partition 13: kafka.common.NotLeaderForPartitionException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at java.lang.Class.newInstance(Class.java:442) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379) > {code} > I haven't understood the cause of this issue, but I'll investigate it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)