Github user janithkv commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r233697180 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record) * <ul> * <li>This is the first batch for this partition</li> * <li>This is a replay of the first batch for this partition</li> - * <li>This is batch n for this partition, where batch 0...n-1 were all empty</li> * </ul> * * @return the offset of the next fetch */ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { - if (isFirstPoll(tp)) { - if (firstPollOffsetStrategy == EARLIEST) { + if (isFirstPollSinceExecutorStarted(tp)) { + boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null + || !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); + if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); - } else if (firstPollOffsetStrategy == LATEST) { + } else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); consumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch - } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { + } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || firstPollOffsetStrategy == EARLIEST) { --- End diff -- Will we ever hit firstPollOffsetStrategy == EARLIEST case here ?
---