Repository: storm
Updated Branches:
  refs/heads/1.x-branch 4e1e62667 -> d5ead6681


STORM-3046: Ensure KafkaTridentSpoutEmitter handles empty batches correctly 
when they occur at the beginning of the stream


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e63f5528
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e63f5528
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e63f5528

Branch: refs/heads/1.x-branch
Commit: e63f5528bc2f807578560ec3f783f326e1aec2a0
Parents: 554ff4a
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Sat Apr 28 13:01:44 2018 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Tue May 29 21:09:46 2018 +0200

----------------------------------------------------------------------
 .../spout/trident/KafkaTridentSpoutEmitter.java | 54 ++++++++++++--------
 1 file changed, 32 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e63f5528/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 3b4aa4b..167cbab 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -59,11 +59,11 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
 
     // Kafka
     private final KafkaConsumer<K, V> kafkaConsumer;
-
+    
     // Bookkeeping
     private final KafkaTridentSpoutManager<K, V> kafkaManager;
-    // set of topic-partitions for which first poll has already occurred, and 
the first polled txid
-    private final Map<TopicPartition, Long> firstPollTransaction = new 
HashMap<>(); 
+    // The first seek offset for each topic partition, i.e. the offset this 
spout instance started processing at.
+    private final Map<TopicPartition, Long> tpToFirstSeekOffset = new 
HashMap<>(); 
 
     // Declare some KafkaTridentSpoutManager references for convenience
     private final long pollTimeoutMs;
@@ -86,7 +86,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
         this.topologyContext = topologyContext;
         this.refreshSubscriptionTimer = refreshSubscriptionTimer;
         this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
-
+    
         final KafkaSpoutConfig<K, V> kafkaSpoutConfig = 
kafkaManager.getKafkaSpoutConfig();
         this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
         this.firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
@@ -124,7 +124,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
                 // pause other topic-partitions to only poll from current 
topic-partition
                 pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
 
-                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
+                seek(currBatchTp, lastBatchMeta);
 
                 // poll
                 if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
@@ -162,16 +162,17 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
      * Determines the offset of the next fetch. Will use the 
firstPollOffsetStrategy if this is the first poll for the topic partition.
      * Otherwise the next offset will be one past the last batch, based on 
lastBatchMeta.
      * 
-     * <p>lastBatchMeta should only be null when the previous txid was not 
emitted (e.g. new topic),
-     * it is the first poll for the spout instance, or it is a replay of the 
first txid this spout emitted on this partition.
-     * In the second case, there are either no previous transactions, or the 
MBC is still committing them
-     * and they will fail because this spout did not emit the corresponding 
batches. If it had emitted them, the meta could not be null. 
-     * In any case, the lastBatchMeta should never be null if this is not the 
first poll for this spout instance.
+     * <p>lastBatchMeta should only be null in the following cases:
+     * <ul>
+     * <li>This is the first batch for this partition</li>
+     * <li>This is a replay of the first batch for this partition</li>
+     * <li>This is batch n for this partition, where batch 0...n-1 were all 
empty</li>
+     * </ul>
      *
      * @return the offset of the next fetch
      */
-    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta, long transactionId) {
-        if (isFirstPoll(tp, transactionId)) {
+    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta) {
+        if (isFirstPoll(tp)) {
             if (firstPollOffsetStrategy == EARLIEST) {
                 LOG.debug("First poll for topic partition [{}], seeking to 
partition beginning", tp);
                 kafkaConsumer.seekToBeginning(Collections.singleton(tp));
@@ -188,10 +189,20 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
                 LOG.debug("First poll for topic partition [{}] with no last 
batch metadata, seeking to partition end", tp);
                 kafkaConsumer.seekToEnd(Collections.singleton(tp));
             }
-            firstPollTransaction.put(tp, transactionId);
-        } else {
+            tpToFirstSeekOffset.put(tp, kafkaConsumer.position(tp));
+        } else if (lastBatchMeta != null) {
             kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
             LOG.debug("First poll for topic partition [{}], using last batch 
metadata", tp);
+        } else {
+            /*
+             * Last batch meta is null, but this is not the first batch 
emitted for this partition by this emitter instance.
+             * This is either a replay of the first batch for this partition, 
or all previous batches were empty,
+             * otherwise last batch meta could not be null. Use the offset the 
consumer started at. 
+             */
+            long initialFetchOffset = tpToFirstSeekOffset.get(tp);
+            kafkaConsumer.seek(tp, initialFetchOffset);
+            LOG.debug("First poll for topic partition [{}], no last batch 
metadata present."
+                + " Using stored initial fetch offset [{}]", tp, 
initialFetchOffset);
         }
 
         final long fetchOffset = kafkaConsumer.position(tp);
@@ -199,9 +210,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
         return fetchOffset;
     }
 
-    private boolean isFirstPoll(TopicPartition tp, long txid) {
-        // The first poll is either the "real" first transaction, or a replay 
of the first transaction
-        return !firstPollTransaction.containsKey(tp) || 
firstPollTransaction.get(tp) == txid;
+    private boolean isFirstPoll(TopicPartition tp) {
+        return !tpToFirstSeekOffset.containsKey(tp);
     }
 
     // returns paused topic-partitions.
@@ -248,14 +258,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
         LOG.debug("Returning topic-partitions {} for task with index [{}]", 
taskTps, taskId);
         return taskTps;
     }
-
+    
     private List<KafkaTridentSpoutTopicPartition> 
newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
         final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps 
== null ? 0 : tps.size());
         if (tps != null) {
-            for (TopicPartition tp : tps) {
-                LOG.trace("Added topic-partition [{}]", tp);
-                kttp.add(new KafkaTridentSpoutTopicPartition(tp));
-            }
+        for (TopicPartition tp : tps) {
+            LOG.trace("Added topic-partition [{}]", tp);
+            kttp.add(new KafkaTridentSpoutTopicPartition(tp));
+        }
         }
         return kttp;
     }

Reply via email to