Repository: storm Updated Branches: refs/heads/master d06bb3856 -> 08d9d5a87
Add KafkaTupleListener to storm-kafka-client Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6fef40b4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6fef40b4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6fef40b4 Branch: refs/heads/master Commit: 6fef40b4fbdad74b61fd54ad48085699a74c569c Parents: 0bf7e70 Author: Bijan Fahimi <bijan.fahimi@outlook> Authored: Thu Aug 17 21:24:00 2017 +0200 Committer: Bijan Fahimi <bijan.fah...@iqser.com> Committed: Thu Aug 24 17:37:57 2017 +0200 ---------------------------------------------------------------------- .../kafka/spout/EmptyKafkaTupleListener.java | 53 +++++++++++++ .../apache/storm/kafka/spout/KafkaSpout.java | 15 +++- .../storm/kafka/spout/KafkaSpoutConfig.java | 26 +++++- .../storm/kafka/spout/KafkaTupleListener.java | 83 ++++++++++++++++++++ 4 files changed, 174 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java new file mode 100644 index 0000000..621fecd --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.storm.kafka.spout; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public final class EmptyKafkaTupleListener implements KafkaTupleListener { + + @Override + public void open(Map<String, Object> conf, TopologyContext context) { } + + @Override + public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { } + + @Override + public void onAck(KafkaSpoutMessageId msgId) { } + + @Override + public void onPartitionsReassigned(Collection<TopicPartition> topicPartitions) { } + + @Override + public void onRetry(KafkaSpoutMessageId msgId) { } + + @Override + public void onMaxRetryReached(KafkaSpoutMessageId msgId) { } + + @Override + public String toString() { + return "EmptyKafkaTupleListener"; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index e4b53ab..64e24a6 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -73,7 +73,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Class that has the logic to handle tuple failure - private transient KafkaSpoutRetryService retryService; + private transient KafkaSpoutRetryService retryService; + // Handles tuple events (emit, ack etc.) + private transient KafkaTupleListener tupleListener; // timer == null for auto commit mode private transient Timer commitTimer; // Flag indicating that the spout is still undergoing initialization process. @@ -95,7 +97,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { - this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>()); + this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); } //This constructor is here for testing @@ -121,6 +123,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Retries management retryService = kafkaSpoutConfig.getRetryService(); + tupleListener = kafkaSpoutConfig.getTupleListener(); + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } @@ -130,6 +134,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { emitted = new HashSet<>(); waitingToEmit = Collections.emptyListIterator(); + tupleListener.open(conf, context); + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } @@ -151,6 +157,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + tupleListener.onPartitionsReassigned(partitions); initialize(partitions); } @@ -363,6 +370,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else { collector.emit(tuple, msgId); } + tupleListener.onEmit(tuple, msgId); } LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); return true; @@ -432,6 +440,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } emitted.remove(msgId); } + tupleListener.onAck(msgId); } // ======== Fail ======= @@ -449,8 +458,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); // this tuple should be removed from emitted only inside the ack() method. This is to ensure // that the OffsetManager for that TopicPartition is updated and allows commit progression + tupleListener.onMaxRetryReached(msgId); ack(msgId); } else { + tupleListener.onRetry(msgId); emitted.remove(msgId); } } http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 72fa52e..d1940ec 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -19,13 +19,13 @@ package org.apache.storm.kafka.spout; import java.io.Serializable; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -59,6 +59,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); + // Kafka consumer configuration private final Map<String, Object> kafkaProps; private final Subscription subscription; @@ -70,6 +72,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final int maxUncommittedOffsets; private final FirstPollOffsetStrategy firstPollOffsetStrategy; private final KafkaSpoutRetryService retryService; + private final KafkaTupleListener tupleListener; private final long partitionRefreshPeriodMs; private final boolean emitNullTuples; @@ -87,6 +90,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; this.maxUncommittedOffsets = builder.maxUncommittedOffsets; this.retryService = builder.retryService; + this.tupleListener = builder.tupleListener; this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; this.emitNullTuples = builder.emitNullTuples; } @@ -124,6 +128,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; + private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; @@ -245,6 +250,20 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return this; } + /** + * Sets the tuple listener for the spout to use. + * + * @param tupleListener the tuple listener + * @return the builder (this). + */ + public Builder<K, V> setTupleListener(KafkaTupleListener tupleListener) { + if (tupleListener == null) { + throw new NullPointerException("KafkaTupleListener cannot be null"); + } + this.tupleListener = tupleListener; + return this; + } + public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) { this.translator = translator; return this; @@ -395,6 +414,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return retryService; } + public KafkaTupleListener getTupleListener() { + return tupleListener; + } + public long getPartitionRefreshPeriodMs() { return partitionRefreshPeriodMs; } @@ -414,6 +437,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { + ", subscription=" + subscription + ", translator=" + translator + ", retryService=" + retryService + + ", tupleListener=" + tupleListener + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java new file mode 100644 index 0000000..3f16220 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + + +/** + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout. + */ +public interface KafkaTupleListener extends Serializable { + + + /** + * Called during the initialization of the kafka spout. + * + * @param conf The storm configuration. + * @param context The {@link TopologyContext} + */ + void open(Map<String, Object> conf, TopologyContext context); + + /** + * Called when the tuple is emitted and auto commit is disabled. + * If kafka auto commit is enabled, the kafka consumer will periodically (depending on the commit interval) + * commit the offsets. Therefore, storm disables anchoring for tuples when auto commit is enabled and the spout will + * not receive acks and fails for those tuples. + * + * @param tuple the storm tuple. + * @param msgId The id of the tuple in the spout. + */ + void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId); + + /** + * Called when a tuple is acked. + * + * @param msgId The id of the tuple in the spout. + */ + void onAck(KafkaSpoutMessageId msgId); + + /** + * Called when kafka partitions are rebalanced. + * + * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously + * assigned to the consumer) + */ + void onPartitionsReassigned(Collection<TopicPartition> partitions); + + /** + * Called when the Kafka spout sets a record for retry. + * + * @param msgId The id of the tuple in the spout. + */ + void onRetry(KafkaSpoutMessageId msgId); + + /** + * Called when the maximum number of retries have been reached. + * + * @param msgId The id of the tuple in the spout. + */ + void onMaxRetryReached(KafkaSpoutMessageId msgId); +}