Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2907#discussion_r234329136
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
    @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt 
tx, TridentCollector collect
     
                 seek(currBatchTp, lastBatchMeta);
     
    -            final ConsumerRecords<K, V> records = 
consumer.poll(pollTimeoutMs);
    -            LOG.debug("Polled [{}] records from Kafka.", records.count());
    +            final List<ConsumerRecord<K, V>> records = 
consumer.poll(pollTimeoutMs).records(currBatchTp);
    +            LOG.debug("Polled [{}] records from Kafka.", records.size());
     
                 if (!records.isEmpty()) {
                     for (ConsumerRecord<K, V> record : records) {
                         emitTuple(collector, record);
                     }
    -                // build new metadata
    -                currentBatch = new 
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), 
this.topologyContext.getStormId());
    +                // build new metadata based on emitted records
    +                currentBatch = new KafkaTridentSpoutBatchMetadata(
    +                    records.get(0).offset(),
    +                    records.get(records.size() - 1).offset(),
    +                    topologyContext.getStormId());
    +            } else {
    +                //Build new metadata based on the consumer position.
    +                //We want the next emit to start at the current consumer 
position,
    +                //so make a meta that indicates that position - 1 is the 
last emitted offset
    +                //This helps us avoid cases like STORM-3279, and 
simplifies the seek logic.
    +                long lastEmittedOffset = consumer.position(currBatchTp) - 
1;
    --- End diff --
    
    Is there ever a chance that `lastEmittedOffset` could end up being 
negative? I am thinking specifically if we start up a topology reading from a 
new/empty topic.


---

Reply via email to