Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2907#discussion_r234338505
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
---
@@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt
tx, TridentCollector collect
seek(currBatchTp, lastBatchMeta);
- final ConsumerRecords<K, V> records =
consumer.poll(pollTimeoutMs);
- LOG.debug("Polled [{}] records from Kafka.", records.count());
+ final List<ConsumerRecord<K, V>> records =
consumer.poll(pollTimeoutMs).records(currBatchTp);
+ LOG.debug("Polled [{}] records from Kafka.", records.size());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
- // build new metadata
- currentBatch = new
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp),
this.topologyContext.getStormId());
+ // build new metadata based on emitted records
+ currentBatch = new KafkaTridentSpoutBatchMetadata(
+ records.get(0).offset(),
+ records.get(records.size() - 1).offset(),
+ topologyContext.getStormId());
+ } else {
+ //Build new metadata based on the consumer position.
+ //We want the next emit to start at the current consumer
position,
+ //so make a meta that indicates that position - 1 is the
last emitted offset
+ //This helps us avoid cases like STORM-3279, and
simplifies the seek logic.
+ long lastEmittedOffset = consumer.position(currBatchTp) -
1;
--- End diff --
Yes, then the starting offset may be 0 and lastEmittedOffset could be -1.
The `seek` method used by `emitPartitionBatchNew` always adds 1 to the last
batch meta offset, so we'd be seeking to 0 or larger.
`reEmitPartitionBatch` does use the last batch meta offset without adding
1, but if we hit this case there are no emits in the batch, so Trident
shouldn't replay it. I'm happy to add guards for -1 to `reEmitPartitionBatch`
though, since I don't feel certain that Trident could never replay an empty
batch.
Any other reason you can see that we should be aware of negative offsets?
---