[ 
https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15153030#comment-15153030
 ] 

ASF GitHub Bot commented on FLINK-3368:
---------------------------------------

Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1623#issuecomment-185903144
  
    I think this is in okay shape, quite an improvement over the current 
version.
    
    There are some things I suggest to re-examine:
    
    
    Method `commitOffsets()` in "KlinkKafkaConsumer08:397" misses `@Override` 
annotation.
    
    The "TODO: maybe add a check again here if we are still running." seems 
worth doing
    
    In many places, you do `if (!closableQueue.addIfOpen(...)) { error }`. 
Simply use the `add()`ยด method, which errors if not open. Also 
(LegacyFetcher:120): There can never be an IllegalStateException while 
initially loading in the constructor.
    
    I do not see the use of the list `deadBrokerThreads` in the legacy fetcher. 
Unless I am overlooking something, I would remove it (simplify the code). Since 
they will eventually shut down anyways, I think you need not sync on them when 
leaving the `run()` method of the legacy fetcher.
    
    The check whether a fetcher thread is alive should probably be 
`thread.getNewPartitionsQueue().isOpen()`, rather than `thread.isAlive()`. That 
is the flag that is atomically changed and checked with the adding of 
partitions. 
    
    The fetcher's main thread always blocks 5 seconds before it can notice that 
the broker threads shut down.
    I am wondering if we can make that more snappy, by waking the main thread 
up when a fetcher thread terminates (by adding a marker element to the queue).
    
    
    Method `findLeaderForPartitions(...)` in the legacy fetcher also fails hard 
once it cannot find the leader for a partition (no retries). Is that intended?
    
    The code uses everywhere `Integer.valueOf()`, creating a boxed integer, and 
then unboxes it. `Integer.parseInt(...)` is the preferrable choice (almost 
always, I would completely drop using `Integer.valueOf()` in any place).
    
    This loop (in `findLeaderForPartitions(...)`) is either to nifty for me to 
comprehend, or bogus:
    
    ```java
    List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
    List<FetchPartition> partitionsToAssignInternal = new 
ArrayList<>(partitionsToAssign);
    
    Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>();
    
    for(KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
        if (partitionsToAssignInternal.size() == 0) {
                // we are done: all partitions are assigned
                break;
        }
        Iterator<FetchPartition> fpIter = partitionsToAssignInternal.iterator();
        while (fpIter.hasNext()) {
                FetchPartition fp = fpIter.next();
                if 
(fp.topic.equals(partitionLeader.getTopicPartition().getTopic())
                                && fp.partition == 
partitionLeader.getTopicPartition().getPartition()) {
                        
                        // we found the leader for one of the fetch partitions
                        Node leader = partitionLeader.getLeader();
                        List<FetchPartition> partitionsOfLeader = 
leaderToPartitions.get(leader);
                        if (partitionsOfLeader == null) {
                                partitionsOfLeader = new ArrayList<>();
                                leaderToPartitions.put(leader, 
partitionsOfLeader);
                        }
                        partitionsOfLeader.add(fp);
                        fpIter.remove();
                        break;
                }
        }
    }
    ```
    Does this do anything different than the version below? (The internal 
iteration always finds the
    exact same element at the first position and breaks).
    ```java
    List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
    Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>();
    
    for (KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
        Node leader = partitionLeader.getLeader();
        List<FetchPartition> partitionsOfLeader = 
leaderToPartitions.get(leader);
        if (partitionsOfLeader == null) {
                partitionsOfLeader = new ArrayList<>();
                leaderToPartitions.put(leader, partitionsOfLeader);
        }
        partitionsOfLeader.add(fp);
    }
    ```


> Kafka 0.8 consumer fails to recover from broker shutdowns
> ---------------------------------------------------------
>
>                 Key: FLINK-3368
>                 URL: https://issues.apache.org/jira/browse/FLINK-3368
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.0
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>
> It seems that the Kafka Consumer (0.8) fails to restart a job after it failed 
> due to a Kafka broker shutdown.
> {code}
> java.lang.Exception: Unable to get last offset for partitions [FetchPartition 
> {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, 
> partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, 
> offset=-915623761776}, FetchPartition {topic=d, partition=13, 
> offset=-915623761776}, FetchPartition {topic=e, partition=13, 
> offset=-915623761776}, FetchPartition {topic=f, partition=13, 
> offset=-915623761776}, FetchPartition {topic=g, partition=13, 
> offset=-915623761776}].
> Exception for partition 13: kafka.common.NotLeaderForPartitionException
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>       at java.lang.Class.newInstance(Class.java:442)
>       at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>       at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379)
> {code}
> I haven't understood the cause of this issue, but I'll investigate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to