Repository: storm
Updated Branches:
  refs/heads/master 402a371cc -> 19cdedfc8


STORM-2994: KafkaSpout consumes messages but doesn't commit offsets


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33961ac7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33961ac7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33961ac7

Branch: refs/heads/master
Commit: 33961ac7b76589a1fc8ecb1d4559517f755bc7d5
Parents: ffa607e
Author: Rui Abreu <[email protected]>
Authored: Tue Apr 3 10:21:05 2018 +0100
Committer: Rui Abreu <[email protected]>
Committed: Tue Apr 3 10:27:30 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 35 +++++---
 .../storm/kafka/spout/KafkaSpoutMessageId.java  | 28 +++---
 .../kafka/spout/KafkaSpoutNullTupleTest.java    | 93 ++++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  3 +
 4 files changed, 131 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9f9f5bb..901e97f 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,7 +24,7 @@ import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -49,7 +49,6 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
-import org.apache.storm.kafka.spout.internal.CommitMetadata;
 import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
@@ -280,7 +279,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 if (isAtLeastOnceProcessing()) {
                     commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NO_GUARANTEE) {
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                         
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                     kafkaConsumer.commitAsync(offsetsToCommit, null);
                     LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
@@ -368,7 +367,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is 
at-most-once.
-                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                     createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                 kafkaConsumer.commitSync(offsetsToCommit);
                 LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
@@ -484,8 +483,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     return true;
                 }
             } else {
+                /*if a null tuple is not configured to be emitted, it should 
be marked as emitted and acked immediately
+                * to allow its offset to be commited to Kafka*/
                 LOG.debug("Not emitting null tuple for record [{}] as defined 
in configuration.", record);
-                msgId.setEmitted(false);
+                msgId.setNullTuple(true);
+                offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                 ack(msgId);
             }
         }
@@ -506,7 +508,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
         return offsetsToCommit;
     }
-    
+
     private void commitOffsetsForAckedTuples(Set<TopicPartition> 
assignedPartitions) {
         // Find offsets that are ready to be committed for every assigned 
topic partition
         final Map<TopicPartition, OffsetManager> assignedOffsetManagers = 
offsetManagers.entrySet().stream()
@@ -570,17 +572,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         // Only need to keep track of acked tuples if commits to Kafka are 
controlled by
         // tuple acks, which happens only for at-least-once processing 
semantics
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+
+        if (msgId.isNullTuple()) {
+            //a null tuple should be added to the ack list since by definition 
is a direct ack
+            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
+            LOG.debug("Received direct ack for message [{}], associated with 
null tuple", msgId);
+            tupleListener.onAck(msgId);
+            return;
+        }
+
         if (!emitted.contains(msgId)) {
-            if (msgId.isEmitted()) {
-                LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that "
-                    + "came from a topic-partition that this consumer group 
instance is no longer tracking "
-                    + "due to rebalance/partition reassignment. No action 
taken.", msgId);
-            } else {
-                LOG.debug("Received direct ack for message [{}], associated 
with null tuple", msgId);
-            }
+            LOG.debug("Received ack for message [{}], associated with tuple 
emitted for a ConsumerRecord that "
+                        + "came from a topic-partition that this consumer 
group instance is no longer tracking "
+                        + "due to rebalance/partition reassignment. No action 
taken.", msgId);
         } else {
             Validate.isTrue(!retryService.isScheduled(msgId), "The message id 
" + msgId + " is queued for retry while being acked."
-                + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
+                        + " This should never occur barring errors in the 
RetryService implementation or the spout code.");
             offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 1626fee..ddf6391 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -27,33 +27,33 @@ public class KafkaSpoutMessageId implements Serializable {
     private final long offset;
     private int numFails = 0;
     /**
-     * true if the record was emitted using a form of collector.emit(...). 
false
+     * false if the record was emitted using a form of collector.emit(...). 
true
      * when skipping null tuples as configured by the user in KafkaSpoutConfig
      */
-    private boolean emitted;
+    private boolean nullTuple;
 
     public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
-        this(consumerRecord, true);
+        this(consumerRecord, false);
     }
 
-    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean 
emitted) {
-        this(new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition()), consumerRecord.offset(), emitted);
+    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean 
nullTuple) {
+        this(new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition()), consumerRecord.offset(), nullTuple);
     }
 
     public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
-        this(topicPart, offset, true);
+        this(topicPart, offset, false);
     }
 
     /**
      * Creates a new KafkaSpoutMessageId.
      * @param topicPart The topic partition this message belongs to
      * @param offset The offset of this message
-     * @param emitted True iff this message is not being skipped as a null 
tuple
+     * @param nullTuple True if this message is being skipped as a null tuple
      */
-    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean 
emitted) {
+    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean 
nullTuple) {
         this.topicPart = topicPart;
         this.offset = offset;
-        this.emitted = emitted;
+        this.nullTuple = nullTuple;
     }
 
     public int partition() {
@@ -80,12 +80,12 @@ public class KafkaSpoutMessageId implements Serializable {
         return topicPart;
     }
 
-    public boolean isEmitted() {
-        return emitted;
+    public boolean isNullTuple() {
+        return nullTuple;
     }
 
-    public void setEmitted(boolean emitted) {
-        this.emitted = emitted;
+    public void setNullTuple(boolean nullTuple) {
+        this.nullTuple = nullTuple;
     }
 
     @Override
@@ -94,7 +94,7 @@ public class KafkaSpoutMessageId implements Serializable {
             + "topic-partition=" + topicPart
             + ", offset=" + offset
             + ", numFails=" + numFails
-            + ", emitted=" + emitted
+            + ", nullTuple=" + nullTuple
             + '}';
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
new file mode 100644
index 0000000..0bbdb55
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
+
+    public KafkaSpoutNullTupleTest() {
+        super(2_000);
+    }
+
+
+    @Override
+    KafkaSpoutConfig<String, String> createSpoutConfig() {
+
+        return KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
+                .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+                .setRecordTranslator(new NullRecordExtractor())
+                .build();
+    }
+
+    @Test
+    public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples() throws 
Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //All null tuples should be commited, meaning they were considered by 
to be emitted and acked
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        verify(collectorMock,never()).emit(
+                anyString(),
+                anyList(),
+                any());
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    private class NullRecordExtractor implements RecordTranslator {
+
+        @Override
+        public List<Object> apply(ConsumerRecord record) {
+            return null;
+
+        }
+
+        @Override
+        public Fields getFieldsFor(String stream) {
+            return new Fields("topic", "key", "value");
+        }
+
+        @Override
+        public Object apply(Object record) {
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index 4896267..f7e0a96 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
+
 public class SingleTopicKafkaSpoutConfiguration {
 
     public static final String STREAM = "test_stream";
@@ -54,6 +55,7 @@ public class SingleTopicKafkaSpoutConfiguration {
             .setPollTimeoutMs(1000);
     }
 
+
     protected static KafkaSpoutRetryService getNoDelayRetryService() {
         /**
          * Retry in a tight loop (keep unit tests fasts).
@@ -61,4 +63,5 @@ public class SingleTopicKafkaSpoutConfiguration {
         return new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
             DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
     }
+
 }

Reply via email to