Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2907#discussion_r241782960
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
---
@@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt
tx, TridentCollector collect
seek(currBatchTp, lastBatchMeta);
- final ConsumerRecords<K, V> records =
consumer.poll(pollTimeoutMs);
- LOG.debug("Polled [{}] records from Kafka.", records.count());
+ final List<ConsumerRecord<K, V>> records =
consumer.poll(pollTimeoutMs).records(currBatchTp);
+ LOG.debug("Polled [{}] records from Kafka.", records.size());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
- // build new metadata
- currentBatch = new
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp),
this.topologyContext.getStormId());
+ // build new metadata based on emitted records
+ currentBatch = new KafkaTridentSpoutBatchMetadata(
+ records.get(0).offset(),
+ records.get(records.size() - 1).offset(),
+ topologyContext.getStormId());
+ } else {
+ //Build new metadata based on the consumer position.
+ //We want the next emit to start at the current consumer
position,
+ //so make a meta that indicates that position - 1 is the
last emitted offset
+ //This helps us avoid cases like STORM-3279, and
simplifies the seek logic.
+ long lastEmittedOffset = consumer.position(currBatchTp) -
1;
--- End diff --
okay sounds good.
---