Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r234329136 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect seek(currBatchTp, lastBatchMeta); - final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMs); - LOG.debug("Polled [{}] records from Kafka.", records.count()); + final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp); + LOG.debug("Polled [{}] records from Kafka.", records.size()); if (!records.isEmpty()) { for (ConsumerRecord<K, V> record : records) { emitTuple(collector, record); } - // build new metadata - currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId()); + // build new metadata based on emitted records + currentBatch = new KafkaTridentSpoutBatchMetadata( + records.get(0).offset(), + records.get(records.size() - 1).offset(), + topologyContext.getStormId()); + } else { + //Build new metadata based on the consumer position. + //We want the next emit to start at the current consumer position, + //so make a meta that indicates that position - 1 is the last emitted offset + //This helps us avoid cases like STORM-3279, and simplifies the seek logic. + long lastEmittedOffset = consumer.position(currBatchTp) - 1; --- End diff -- Is there ever a chance that `lastEmittedOffset` could end up being negative? I am thinking specifically if we start up a topology reading from a new/empty topic.
---