Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r156224911 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -223,29 +220,24 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { @Override public void nextTuple() { try { - if (initialized) { --- End diff -- I'm not sure it was ever necessary. When we were using the subscribe API the consumer rebalance code would be running as part of a call to KafkaConsumer.poll, initially from e.g. the NamedSubscription.subscribe method called by KafkaSpout.activate. The call to poll blocks until the initial rebalance is complete. Since switching to the assign API, the rebalance code is running as part of the call to Subscription.subscribe/refreshPartitions, still initially called by KafkaSpout.activate. The rebalance listener is always called synchronously as part of the Subscription.subscribe call in KafkaSpout.activate, so it shouldn't be possible to reach KafkaSpout.nextTuple without initialized being true, because it will be true once KafkaSpout.activate returns.
---