Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2454#discussion_r156224911
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -223,29 +220,24 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
         @Override
         public void nextTuple() {
             try {
    -            if (initialized) {             
    --- End diff --
    
    I'm not sure it was ever necessary.
    
    When we were using the subscribe API the consumer rebalance code would be 
running as part of a call to KafkaConsumer.poll, initially from e.g. the 
NamedSubscription.subscribe method called by KafkaSpout.activate. The call to 
poll blocks until the initial rebalance is complete.
    
    Since switching to the assign API, the rebalance code is running as part of 
the call to Subscription.subscribe/refreshPartitions, still initially called by 
KafkaSpout.activate.
    
    The rebalance listener is always called synchronously as part of the 
Subscription.subscribe call in KafkaSpout.activate, so it shouldn't be possible 
to reach KafkaSpout.nextTuple without initialized being true, because it will 
be true once KafkaSpout.activate returns. 


---

Reply via email to