Github user knusbaum commented on a diff in the pull request:
https://github.com/apache/storm/pull/1131#discussion_r53680233
--- Diff:
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class KafkaSpout<K,V> extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final
Comparator<org.apache.storm.kafka.spout.MessageId> OFFSET_COMPARATOR = new
OffsetComparator();
+
+ // Storm
+ private Map conf;
+ private TopologyContext context;
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final org.apache.storm.kafka.spout.KafkaSpoutConfig<K, V>
kafkaSpoutConfig;
+ private KafkaConsumer<K, V> kafkaConsumer;
+
+ // Bookkeeping
+ private org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream;
+ private org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V>
tupleBuilder;
+ private transient ScheduledExecutorService commitOffsetsTask;
+ private transient Lock ackCommitLock;
+ private transient volatile boolean commit;
+ private transient Map<org.apache.storm.kafka.spout.MessageId, Values>
emittedTuples; // Keeps a list of emitted tuples that are pending
being acked or failed
+ private transient Map<TopicPartition,
Set<org.apache.storm.kafka.spout.MessageId>> failed; // failed tuples. They
stay in this list until success or max retries is reached
+ private transient Map<TopicPartition, OffsetEntry> acked; //
emitted tuples that were successfully acked. These tuples will be committed by
the commitOffsetsTask or on consumer rebalance
+ private transient Set<org.apache.storm.kafka.spout.MessageId>
blackList; // all the tuples that are in traffic when the
rebalance occurs will be added to black list to be disregarded when they are
either acked or failed
+ private transient int maxRetries;
+
+ public KafkaSpout(org.apache.storm.kafka.spout.KafkaSpoutConfig<K,V>
kafkaSpoutConfig, org.apache.storm.kafka.spout.KafkaSpoutStream
kafkaSpoutStream, org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V>
tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass
in configuration
+ this.kafkaSpoutStream = kafkaSpoutStream;
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ // Spout internals
+ this.conf = conf;
+ this.context = context;
+ this.collector = collector;
+
+ // Bookkeeping objects
+ emittedTuples = new HashMap<>();
+ failed = new HashMap<>();
+ acked = new HashMap<>();
+ blackList = new HashSet<>();
+ ackCommitLock = new ReentrantLock();
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+
+ // Kafka consumer
+ kafkaConsumer = new
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(),
kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(),
new KafkaSpoutConsumerRebalanceListener());
+
+ // Create commit offsets task
+ if (!kafkaSpoutConfig.isConsumerAutoCommitMode()) { // If it
is auto commit, no need to commit offsets manually
+ createCommitOffsetsTask();
+ }
+ }
+
+ // ======== Commit Offsets Task =======
+
+ private void createCommitOffsetsTask() {
+ commitOffsetsTask =
Executors.newSingleThreadScheduledExecutor(commitOffsetsThreadFactory());
+ commitOffsetsTask.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ commit = true;
+ }
+ }, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(),
TimeUnit.MILLISECONDS);
+ }
+
+ private ThreadFactory commitOffsetsThreadFactory() {
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "kafka-spout-commit-offsets-thread");
+ }
+ };
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if(commit) {
+ commitAckedTuples();
+ } else if (retry()) { // Don't process new tuples
until the failed tuples have all been acked
+ retryFailedTuples();
+ } else {
+ emitTuples(poll());
+ }
+ }
+
+ private ConsumerRecords<K, V> poll() {
+ final ConsumerRecords<K, V> consumerRecords =
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ LOG.debug("Polled [{]} records from Kafka",
consumerRecords.count());
+ return consumerRecords;
+ }
+
+ private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ final Iterable<ConsumerRecord<K, V>> records =
consumerRecords.records(tp.topic()); // TODO Decide if want to give
flexibility to emmit/poll either per topic or per partition
+ for (ConsumerRecord<K, V> record : records) {
+ final Values tuple = tupleBuilder.buildTuple(record);
+ final org.apache.storm.kafka.spout.MessageId messageId =
new org.apache.storm.kafka.spout.MessageId(record);
// TODO don't create message for non acking mode. Should we support non
acking mode?
+ collector.emit(kafkaSpoutStream.getStreamId(), tuple,
messageId); // emits one tuple per record
+ emittedTuples.put(messageId, tuple);
+ LOG.info("HMCL - Emitted tuple for record {}", record);
+ }
+ }
+ }
+
+ private boolean retry() {
+ return failed.size() > 0;
+ }
+
+ private void retryFailedTuples() {
+ for (TopicPartition tp : failed.keySet()) {
+ for (org.apache.storm.kafka.spout.MessageId msgId :
failed.get(tp)) {
+ if (isInBlackList(msgId)) {
+ removeFromBlacklist(msgId);
+ removeFromFailed(tp, msgId);
+ } else {
+ final Values tuple = emittedTuples.get(msgId);
+ LOG.debug("Retrying tuple. [msgId={}, tuple={}]",
msgId, tuple);
+ collector.emit(kafkaSpoutStream.getStreamId(), tuple,
msgId);
+ }
+ }
+ }
+ }
+
+ // all the tuples that are in traffic when the rebalance occurs will
be added
+ // to black list to be disregarded when they are either acked or failed
+ private boolean isInBlackList(org.apache.storm.kafka.spout.MessageId
msgId) {
+ return blackList.contains(msgId);
+ }
+
+ private void
removeFromBlacklist(org.apache.storm.kafka.spout.MessageId msgId) {
+ blackList.remove(msgId);
+ }
+
+ // ======== Ack =======
+
+ @Override
+ public void ack(Object messageId) {
+ final org.apache.storm.kafka.spout.MessageId msgId =
(org.apache.storm.kafka.spout.MessageId) messageId;
+ final TopicPartition tp = msgId.getTopicPartition();
+
+ if (isInBlackList(msgId)) {
+ removeFromBlacklist(msgId);
+ } else {
+ addAckedTuples(tp, msgId);
+ // Removed acked tuples from the emittedTuples data structure
+ emittedTuples.remove(msgId);
+ // if this acked msg is a retry, remove it from failed data
structure
+ removeFromFailed(tp, msgId);
+ }
+ }
+
+ private void addAckedTuples(TopicPartition tp,
org.apache.storm.kafka.spout.MessageId msgId) {
+ // lock because ack and commit happen in different threads
+ ackCommitLock.lock();
--- End diff --
`ackCommitLock` can be replaced with a synchronized block or method.
There's no reason in this class to do explicit lock/unlock operations.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---