[ https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15153030#comment-15153030 ]
ASF GitHub Bot commented on FLINK-3368: --------------------------------------- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1623#issuecomment-185903144 I think this is in okay shape, quite an improvement over the current version. There are some things I suggest to re-examine: Method `commitOffsets()` in "KlinkKafkaConsumer08:397" misses `@Override` annotation. The "TODO: maybe add a check again here if we are still running." seems worth doing In many places, you do `if (!closableQueue.addIfOpen(...)) { error }`. Simply use the `add()`ยด method, which errors if not open. Also (LegacyFetcher:120): There can never be an IllegalStateException while initially loading in the constructor. I do not see the use of the list `deadBrokerThreads` in the legacy fetcher. Unless I am overlooking something, I would remove it (simplify the code). Since they will eventually shut down anyways, I think you need not sync on them when leaving the `run()` method of the legacy fetcher. The check whether a fetcher thread is alive should probably be `thread.getNewPartitionsQueue().isOpen()`, rather than `thread.isAlive()`. That is the flag that is atomically changed and checked with the adding of partitions. The fetcher's main thread always blocks 5 seconds before it can notice that the broker threads shut down. I am wondering if we can make that more snappy, by waking the main thread up when a fetcher thread terminates (by adding a marker element to the queue). Method `findLeaderForPartitions(...)` in the legacy fetcher also fails hard once it cannot find the leader for a partition (no retries). Is that intended? The code uses everywhere `Integer.valueOf()`, creating a boxed integer, and then unboxes it. `Integer.parseInt(...)` is the preferrable choice (almost always, I would completely drop using `Integer.valueOf()` in any place). This loop (in `findLeaderForPartitions(...)`) is either to nifty for me to comprehend, or bogus: ```java List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); List<FetchPartition> partitionsToAssignInternal = new ArrayList<>(partitionsToAssign); Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>(); for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { if (partitionsToAssignInternal.size() == 0) { // we are done: all partitions are assigned break; } Iterator<FetchPartition> fpIter = partitionsToAssignInternal.iterator(); while (fpIter.hasNext()) { FetchPartition fp = fpIter.next(); if (fp.topic.equals(partitionLeader.getTopicPartition().getTopic()) && fp.partition == partitionLeader.getTopicPartition().getPartition()) { // we found the leader for one of the fetch partitions Node leader = partitionLeader.getLeader(); List<FetchPartition> partitionsOfLeader = leaderToPartitions.get(leader); if (partitionsOfLeader == null) { partitionsOfLeader = new ArrayList<>(); leaderToPartitions.put(leader, partitionsOfLeader); } partitionsOfLeader.add(fp); fpIter.remove(); break; } } } ``` Does this do anything different than the version below? (The internal iteration always finds the exact same element at the first position and breaks). ```java List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>(); for (KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { Node leader = partitionLeader.getLeader(); List<FetchPartition> partitionsOfLeader = leaderToPartitions.get(leader); if (partitionsOfLeader == null) { partitionsOfLeader = new ArrayList<>(); leaderToPartitions.put(leader, partitionsOfLeader); } partitionsOfLeader.add(fp); } ``` > Kafka 0.8 consumer fails to recover from broker shutdowns > --------------------------------------------------------- > > Key: FLINK-3368 > URL: https://issues.apache.org/jira/browse/FLINK-3368 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.0 > Reporter: Robert Metzger > Assignee: Robert Metzger > Priority: Blocker > > It seems that the Kafka Consumer (0.8) fails to restart a job after it failed > due to a Kafka broker shutdown. > {code} > java.lang.Exception: Unable to get last offset for partitions [FetchPartition > {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, > partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, > offset=-915623761776}, FetchPartition {topic=d, partition=13, > offset=-915623761776}, FetchPartition {topic=e, partition=13, > offset=-915623761776}, FetchPartition {topic=f, partition=13, > offset=-915623761776}, FetchPartition {topic=g, partition=13, > offset=-915623761776}]. > Exception for partition 13: kafka.common.NotLeaderForPartitionException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at java.lang.Class.newInstance(Class.java:442) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379) > {code} > I haven't understood the cause of this issue, but I'll investigate it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)