[ 
https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15143135#comment-15143135
 ] 

ASF GitHub Bot commented on FLINK-3368:
---------------------------------------

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/1623

    [FLINK-3368][Kafka 0.8] Handle leader changes in Kafka Consumer.

    Please see the JIRA for an explanation of the problems.
    tl;dr: The Kafka 0.8 consumer now handles broker failures internally, 
without relying on Flink's checkpointing / job recovery.
    
    The test case which was previously relying on a topology restart is now 
expecting the consumers to handle the failure.
    
    I also tested this change on a 7 nodes cluster. The job was running for 30 
minutes, surviving 4 broker shutdowns.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink3368

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1623.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1623
    
----
commit 5e3bc92a0e6b1250a8ae0f8d898296586851dea1
Author: Robert Metzger <rmetz...@apache.org>
Date:   2016-02-09T11:04:45Z

    [FLINK-3368][Kafka 0.8] Handle leader changes in Kafka Consumer.

----


> Kafka 0.8 consumer fails to recover from broker shutdowns
> ---------------------------------------------------------
>
>                 Key: FLINK-3368
>                 URL: https://issues.apache.org/jira/browse/FLINK-3368
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.0
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>
> It seems that the Kafka Consumer (0.8) fails to restart a job after it failed 
> due to a Kafka broker shutdown.
> {code}
> java.lang.Exception: Unable to get last offset for partitions [FetchPartition 
> {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, 
> partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, 
> offset=-915623761776}, FetchPartition {topic=d, partition=13, 
> offset=-915623761776}, FetchPartition {topic=e, partition=13, 
> offset=-915623761776}, FetchPartition {topic=f, partition=13, 
> offset=-915623761776}, FetchPartition {topic=g, partition=13, 
> offset=-915623761776}].
> Exception for partition 13: kafka.common.NotLeaderForPartitionException
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>       at java.lang.Class.newInstance(Class.java:442)
>       at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>       at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379)
> {code}
> I haven't understood the cause of this issue, but I'll investigate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to