Repository: storm Updated Branches: refs/heads/1.x-branch 10ab96a5c -> c4a09d311
Add KafkaTupleListener to storm-kafka-client Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43625c26 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43625c26 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43625c26 Branch: refs/heads/1.x-branch Commit: 43625c2637b65f4452cb21f9983274daf6d7f357 Parents: 4f19fee Author: Bijan Fahimi <[email protected]> Authored: Wed Aug 30 10:16:31 2017 +0200 Committer: Bijan Fahimi <[email protected]> Committed: Wed Aug 30 10:16:31 2017 +0200 ---------------------------------------------------------------------- .../kafka/spout/EmptyKafkaTupleListener.java | 53 +++++++++++++ .../apache/storm/kafka/spout/KafkaSpout.java | 10 +++ .../storm/kafka/spout/KafkaSpoutConfig.java | 38 +++++++-- .../storm/kafka/spout/KafkaTupleListener.java | 83 ++++++++++++++++++++ 4 files changed, 177 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java new file mode 100644 index 0000000..621fecd --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.storm.kafka.spout; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public final class EmptyKafkaTupleListener implements KafkaTupleListener { + + @Override + public void open(Map<String, Object> conf, TopologyContext context) { } + + @Override + public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { } + + @Override + public void onAck(KafkaSpoutMessageId msgId) { } + + @Override + public void onPartitionsReassigned(Collection<TopicPartition> topicPartitions) { } + + @Override + public void onRetry(KafkaSpoutMessageId msgId) { } + + @Override + public void onMaxRetryReached(KafkaSpoutMessageId msgId) { } + + @Override + public String toString() { + return "EmptyKafkaTupleListener"; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 6631930..9f806b5 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -73,6 +73,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Bookkeeping private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure + private transient KafkaTupleListener tupleListener; // Handles tuple events (emit, ack etc.) private transient Timer commitTimer; // timer == null for auto commit mode private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() @@ -112,6 +113,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Retries management retryService = kafkaSpoutConfig.getRetryService(); + tupleListener = kafkaSpoutConfig.getTupleListener(); + tupleListener.open(conf, context); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } @@ -142,6 +147,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + tupleListener.onPartitionsReassigned(partitions); initialize(partitions); } @@ -349,6 +355,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else { collector.emit(tuple, msgId); } + tupleListener.onEmit(tuple, msgId); } LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); return true; @@ -418,6 +425,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } emitted.remove(msgId); } + tupleListener.onAck(msgId); } // ======== Fail ======= @@ -434,8 +442,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); // this tuple should be removed from emitted only inside the ack() method. This is to ensure // that the OffsetManager for that TopicPartition is updated and allows commit progression + tupleListener.onMaxRetryReached(msgId); ack(msgId); } else { + tupleListener.onRetry(msgId); emitted.remove(msgId); } } http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 833ce4a..79e8189 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -18,13 +18,6 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; -import org.apache.storm.tuple.Fields; - import java.io.Serializable; import java.util.Collection; import java.util.HashMap; @@ -34,7 +27,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.tuple.Fields; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics @@ -57,6 +57,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); + // Kafka consumer configuration private final Map<String, Object> kafkaProps; private final Subscription subscription; @@ -68,6 +70,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final int maxUncommittedOffsets; private final FirstPollOffsetStrategy firstPollOffsetStrategy; private final KafkaSpoutRetryService retryService; + private final KafkaTupleListener tupleListener; private final long partitionRefreshPeriodMs; private final boolean emitNullTuples; private final SerializableDeserializer<K> keyDes; @@ -89,6 +92,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; this.maxUncommittedOffsets = builder.maxUncommittedOffsets; this.retryService = builder.retryService; + this.tupleListener = builder.tupleListener; this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; this.emitNullTuples = builder.emitNullTuples; this.keyDes = builder.keyDes; @@ -134,6 +138,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; + private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; @@ -495,6 +500,20 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return this; } + /** + * Sets the tuple listener for the spout to use. + * + * @param tupleListener the tuple listener + * @return the builder (this). + */ + public Builder<K, V> setTupleListener(KafkaTupleListener tupleListener) { + if (tupleListener == null) { + throw new NullPointerException("KafkaTupleListener cannot be null"); + } + this.tupleListener = tupleListener; + return this; + } + public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) { this.translator = translator; return this; @@ -675,6 +694,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return retryService; } + public KafkaTupleListener getTupleListener() { + return tupleListener; + } + public long getPartitionRefreshPeriodMs() { return partitionRefreshPeriodMs; } @@ -696,6 +719,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { + ", subscription=" + subscription + ", translator=" + translator + ", retryService=" + retryService + + ", tupleListener=" + tupleListener + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java new file mode 100644 index 0000000..3f16220 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + + +/** + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout. + */ +public interface KafkaTupleListener extends Serializable { + + + /** + * Called during the initialization of the kafka spout. + * + * @param conf The storm configuration. + * @param context The {@link TopologyContext} + */ + void open(Map<String, Object> conf, TopologyContext context); + + /** + * Called when the tuple is emitted and auto commit is disabled. + * If kafka auto commit is enabled, the kafka consumer will periodically (depending on the commit interval) + * commit the offsets. Therefore, storm disables anchoring for tuples when auto commit is enabled and the spout will + * not receive acks and fails for those tuples. + * + * @param tuple the storm tuple. + * @param msgId The id of the tuple in the spout. + */ + void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId); + + /** + * Called when a tuple is acked. + * + * @param msgId The id of the tuple in the spout. + */ + void onAck(KafkaSpoutMessageId msgId); + + /** + * Called when kafka partitions are rebalanced. + * + * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously + * assigned to the consumer) + */ + void onPartitionsReassigned(Collection<TopicPartition> partitions); + + /** + * Called when the Kafka spout sets a record for retry. + * + * @param msgId The id of the tuple in the spout. + */ + void onRetry(KafkaSpoutMessageId msgId); + + /** + * Called when the maximum number of retries have been reached. + * + * @param msgId The id of the tuple in the spout. + */ + void onMaxRetryReached(KafkaSpoutMessageId msgId); +}
