Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2907#discussion_r233750714
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
---
@@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector,
ConsumerRecord<K, V> record)
* <ul>
* <li>This is the first batch for this partition</li>
* <li>This is a replay of the first batch for this partition</li>
- * <li>This is batch n for this partition, where batch 0...n-1 were
all empty</li>
* </ul>
*
* @return the offset of the next fetch
*/
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata
lastBatchMeta) {
- if (isFirstPoll(tp)) {
- if (firstPollOffsetStrategy == EARLIEST) {
+ if (isFirstPollSinceExecutorStarted(tp)) {
+ boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta ==
null
+ ||
!topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
+ if (firstPollOffsetStrategy == EARLIEST &&
isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to
partition beginning", tp);
consumer.seekToBeginning(Collections.singleton(tp));
- } else if (firstPollOffsetStrategy == LATEST) {
+ } else if (firstPollOffsetStrategy == LATEST &&
isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to
partition end", tp);
consumer.seekToEnd(Collections.singleton(tp));
} else if (lastBatchMeta != null) {
LOG.debug("First poll for topic partition [{}], using last
batch metadata", tp);
consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); //
seek next offset after last offset from previous batch
- } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+ } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST ||
firstPollOffsetStrategy == EARLIEST) {
--- End diff --
No, doesn't look like it. Good catch.
---