Repository: storm
Updated Branches:
  refs/heads/master d06bb3856 -> 08d9d5a87


Add KafkaTupleListener to storm-kafka-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6fef40b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6fef40b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6fef40b4

Branch: refs/heads/master
Commit: 6fef40b4fbdad74b61fd54ad48085699a74c569c
Parents: 0bf7e70
Author: Bijan Fahimi <bijan.fahimi@outlook>
Authored: Thu Aug 17 21:24:00 2017 +0200
Committer: Bijan Fahimi <bijan.fah...@iqser.com>
Committed: Thu Aug 24 17:37:57 2017 +0200

----------------------------------------------------------------------
 .../kafka/spout/EmptyKafkaTupleListener.java    | 53 +++++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 15 +++-
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 26 +++++-
 .../storm/kafka/spout/KafkaTupleListener.java   | 83 ++++++++++++++++++++
 4 files changed, 174 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
new file mode 100644
index 0000000..621fecd
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public final class EmptyKafkaTupleListener implements KafkaTupleListener {
+
+    @Override
+    public void open(Map<String, Object> conf, TopologyContext context) { }
+
+    @Override
+    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public void onAck(KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public void onPartitionsReassigned(Collection<TopicPartition> 
topicPartitions) { }
+
+    @Override
+    public void onRetry(KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public String toString() {
+        return "EmptyKafkaTupleListener";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e4b53ab..64e24a6 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -73,7 +73,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Strategy to determine the fetch offset of the first realized by the 
spout upon activation
     private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  
     // Class that has the logic to handle tuple failure
-    private transient KafkaSpoutRetryService retryService;              
+    private transient KafkaSpoutRetryService retryService;
+    // Handles tuple events (emit, ack etc.)
+    private transient KafkaTupleListener tupleListener;
     // timer == null for auto commit mode
     private transient Timer commitTimer;                                
     // Flag indicating that the spout is still undergoing initialization 
process.
@@ -95,7 +97,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
     }
 
     //This constructor is here for testing
@@ -121,6 +123,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
 
+        tupleListener = kafkaSpoutConfig.getTupleListener();
+
         if (!consumerAutoCommitMode) {     // If it is auto commit, no need to 
commit offsets manually
             commitTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
@@ -130,6 +134,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         emitted = new HashSet<>();
         waitingToEmit = Collections.emptyListIterator();
 
+        tupleListener.open(conf, context);
+
         LOG.info("Kafka Spout opened with the following configuration: {}", 
kafkaSpoutConfig);
     }
 
@@ -151,6 +157,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, 
consumer={}, topic-partitions={}]",
                     context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
+            tupleListener.onPartitionsReassigned(partitions);
             initialize(partitions);
         }
 
@@ -363,6 +370,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                         } else {
                             collector.emit(tuple, msgId);
                         }
+                        tupleListener.onEmit(tuple, msgId);
                     }
                     LOG.trace("Emitted tuple [{}] for record [{}] with msgId 
[{}]", tuple, record, msgId);
                     return true;
@@ -432,6 +440,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             }
             emitted.remove(msgId);
         }
+        tupleListener.onAck(msgId);
     }
 
     // ======== Fail =======
@@ -449,8 +458,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
             // this tuple should be removed from emitted only inside the ack() 
method. This is to ensure
             // that the OffsetManager for that TopicPartition is updated and 
allows commit progression
+            tupleListener.onMaxRetryReached(msgId);
             ack(msgId);
         } else {
+            tupleListener.onRetry(msgId);
             emitted.remove(msgId);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 72fa52e..d1940ec 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -19,13 +19,13 @@
 package org.apache.storm.kafka.spout;
 
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -59,6 +59,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
             DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
 
+    public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new 
EmptyKafkaTupleListener();
+
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
     private final Subscription subscription;
@@ -70,6 +72,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final int maxUncommittedOffsets;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final KafkaSpoutRetryService retryService;
+    private final KafkaTupleListener tupleListener;
     private final long partitionRefreshPeriodMs;
     private final boolean emitNullTuples;
 
@@ -87,6 +90,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
         this.retryService = builder.retryService;
+        this.tupleListener = builder.tupleListener;
         this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
         this.emitNullTuples = builder.emitNullTuples;
     }
@@ -124,6 +128,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         private FirstPollOffsetStrategy firstPollOffsetStrategy = 
DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
         private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+        private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
         private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
 
@@ -245,6 +250,20 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
             return this;
         }
 
+        /**
+         * Sets the tuple listener for the spout to use.
+         *
+         * @param tupleListener the tuple listener
+         * @return the builder (this).
+         */
+        public Builder<K, V> setTupleListener(KafkaTupleListener 
tupleListener) {
+            if (tupleListener == null) {
+                throw new NullPointerException("KafkaTupleListener cannot be 
null");
+            }
+            this.tupleListener = tupleListener;
+            return this;
+        }
+
         public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> 
translator) {
             this.translator = translator;
             return this;
@@ -395,6 +414,10 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         return retryService;
     }
 
+    public KafkaTupleListener getTupleListener() {
+        return tupleListener;
+    }
+
     public long getPartitionRefreshPeriodMs() {
         return partitionRefreshPeriodMs;
     }
@@ -414,6 +437,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
             + ", subscription=" + subscription
             + ", translator=" + translator
             + ", retryService=" + retryService
+            + ", tupleListener=" + tupleListener
             + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
new file mode 100644
index 0000000..3f16220
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+
+/**
+ * The KafkaTupleListener handles state changes of a kafka tuple inside a 
KafkaSpout.
+ */
+public interface KafkaTupleListener extends Serializable {
+
+
+    /**
+     * Called during the initialization of the kafka spout.
+     *
+     * @param conf The storm configuration.
+     * @param context The {@link TopologyContext}
+     */
+    void open(Map<String, Object> conf, TopologyContext context);
+
+    /**
+     * Called when the tuple is emitted and auto commit is disabled.
+     * If kafka auto commit is enabled, the kafka consumer will periodically 
(depending on the commit interval)
+     * commit the offsets. Therefore, storm disables anchoring for tuples when 
auto commit is enabled and the spout will
+     * not receive acks and fails for those tuples.
+     *
+     * @param tuple the storm tuple.
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when a tuple is acked.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onAck(KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when kafka partitions are rebalanced.
+     *
+     * @param partitions The list of partitions that are now assigned to the 
consumer (may include partitions previously
+     *                   assigned to the consumer)
+     */
+    void onPartitionsReassigned(Collection<TopicPartition> partitions);
+
+    /**
+     * Called when the Kafka spout sets a record for retry.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onRetry(KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when the maximum number of retries have been reached.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onMaxRetryReached(KafkaSpoutMessageId msgId);
+}

Reply via email to