Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2907#discussion_r234338505
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
    @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt 
tx, TridentCollector collect
     
                 seek(currBatchTp, lastBatchMeta);
     
    -            final ConsumerRecords<K, V> records = 
consumer.poll(pollTimeoutMs);
    -            LOG.debug("Polled [{}] records from Kafka.", records.count());
    +            final List<ConsumerRecord<K, V>> records = 
consumer.poll(pollTimeoutMs).records(currBatchTp);
    +            LOG.debug("Polled [{}] records from Kafka.", records.size());
     
                 if (!records.isEmpty()) {
                     for (ConsumerRecord<K, V> record : records) {
                         emitTuple(collector, record);
                     }
    -                // build new metadata
    -                currentBatch = new 
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), 
this.topologyContext.getStormId());
    +                // build new metadata based on emitted records
    +                currentBatch = new KafkaTridentSpoutBatchMetadata(
    +                    records.get(0).offset(),
    +                    records.get(records.size() - 1).offset(),
    +                    topologyContext.getStormId());
    +            } else {
    +                //Build new metadata based on the consumer position.
    +                //We want the next emit to start at the current consumer 
position,
    +                //so make a meta that indicates that position - 1 is the 
last emitted offset
    +                //This helps us avoid cases like STORM-3279, and 
simplifies the seek logic.
    +                long lastEmittedOffset = consumer.position(currBatchTp) - 
1;
    --- End diff --
    
    Yes, then the starting offset may be 0 and lastEmittedOffset could be -1.
    
    The `seek` method used by `emitPartitionBatchNew` always adds 1 to the last 
batch meta offset, so we'd be seeking to 0 or larger.
    
    `reEmitPartitionBatch` does use the last batch meta offset without adding 
1, but if we hit this case there are no emits in the batch, so Trident 
shouldn't replay it. I'm happy to add guards for -1 to `reEmitPartitionBatch` 
though, since I don't feel certain that Trident could never replay an empty 
batch.
    
    Any other reason you can see that we should be aware of negative offsets?


---

Reply via email to