Github user janithkv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2907#discussion_r233697224
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
    @@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, 
ConsumerRecord<K, V> record)
          * <ul>
          * <li>This is the first batch for this partition</li>
          * <li>This is a replay of the first batch for this partition</li>
    -     * <li>This is batch n for this partition, where batch 0...n-1 were 
all empty</li>
          * </ul>
          *
          * @return the offset of the next fetch
          */
         private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta) {
    -        if (isFirstPoll(tp)) {
    -            if (firstPollOffsetStrategy == EARLIEST) {
    +        if (isFirstPollSinceExecutorStarted(tp)) {
    +            boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == 
null 
    +                || 
!topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
    +            if (firstPollOffsetStrategy == EARLIEST && 
isFirstPollSinceTopologyWasDeployed) {
                     LOG.debug("First poll for topic partition [{}], seeking to 
partition beginning", tp);
                     consumer.seekToBeginning(Collections.singleton(tp));
    -            } else if (firstPollOffsetStrategy == LATEST) {
    +            } else if (firstPollOffsetStrategy == LATEST && 
isFirstPollSinceTopologyWasDeployed) {
                     LOG.debug("First poll for topic partition [{}], seeking to 
partition end", tp);
                     consumer.seekToEnd(Collections.singleton(tp));
                 } else if (lastBatchMeta != null) {
                     LOG.debug("First poll for topic partition [{}], using last 
batch metadata", tp);
                     consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
    -            } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
    +            } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || 
firstPollOffsetStrategy == EARLIEST) {
                     LOG.debug("First poll for topic partition [{}] with no 
last batch metadata, seeking to partition beginning", tp);
                     consumer.seekToBeginning(Collections.singleton(tp));
    -            } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
    +            } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST || 
firstPollOffsetStrategy == LATEST) {
    --- End diff --
    
    Will we ever hit firstPollOffsetStrategy == LATEST case here ?


---

Reply via email to