[
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157772#comment-15157772
]
ASF GitHub Bot commented on STORM-822:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1131#discussion_r53699257
--- Diff:
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class KafkaSpout<K,V> extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final
Comparator<org.apache.storm.kafka.spout.MessageId> OFFSET_COMPARATOR = new
OffsetComparator();
+
+ // Storm
+ private Map conf;
+ private TopologyContext context;
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final org.apache.storm.kafka.spout.KafkaSpoutConfig<K, V>
kafkaSpoutConfig;
+ private KafkaConsumer<K, V> kafkaConsumer;
+
+ // Bookkeeping
+ private org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream;
+ private org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V>
tupleBuilder;
+ private transient ScheduledExecutorService commitOffsetsTask;
+ private transient Lock ackCommitLock;
+ private transient volatile boolean commit;
+ private transient Map<org.apache.storm.kafka.spout.MessageId, Values>
emittedTuples; // Keeps a list of emitted tuples that are pending
being acked or failed
+ private transient Map<TopicPartition,
Set<org.apache.storm.kafka.spout.MessageId>> failed; // failed tuples. They
stay in this list until success or max retries is reached
+ private transient Map<TopicPartition, OffsetEntry> acked; //
emitted tuples that were successfully acked. These tuples will be committed by
the commitOffsetsTask or on consumer rebalance
+ private transient Set<org.apache.storm.kafka.spout.MessageId>
blackList; // all the tuples that are in traffic when the
rebalance occurs will be added to black list to be disregarded when they are
either acked or failed
+ private transient int maxRetries;
+
+ public KafkaSpout(org.apache.storm.kafka.spout.KafkaSpoutConfig<K,V>
kafkaSpoutConfig, org.apache.storm.kafka.spout.KafkaSpoutStream
kafkaSpoutStream, org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V>
tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass
in configuration
+ this.kafkaSpoutStream = kafkaSpoutStream;
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ // Spout internals
+ this.conf = conf;
+ this.context = context;
+ this.collector = collector;
+
+ // Bookkeeping objects
+ emittedTuples = new HashMap<>();
+ failed = new HashMap<>();
+ acked = new HashMap<>();
+ blackList = new HashSet<>();
+ ackCommitLock = new ReentrantLock();
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+
+ // Kafka consumer
+ kafkaConsumer = new
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(),
kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(),
new KafkaSpoutConsumerRebalanceListener());
+
+ // Create commit offsets task
+ if (!kafkaSpoutConfig.isConsumerAutoCommitMode()) { // If it
is auto commit, no need to commit offsets manually
+ createCommitOffsetsTask();
+ }
+ }
+
+ // ======== Commit Offsets Task =======
+
+ private void createCommitOffsetsTask() {
+ commitOffsetsTask =
Executors.newSingleThreadScheduledExecutor(commitOffsetsThreadFactory());
+ commitOffsetsTask.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ commit = true;
+ }
+ }, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(),
TimeUnit.MILLISECONDS);
+ }
+
+ private ThreadFactory commitOffsetsThreadFactory() {
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "kafka-spout-commit-offsets-thread");
+ }
+ };
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if(commit) {
+ commitAckedTuples();
+ } else if (retry()) { // Don't process new tuples
until the failed tuples have all been acked
+ retryFailedTuples();
+ } else {
+ emitTuples(poll());
+ }
+ }
+
+ private ConsumerRecords<K, V> poll() {
+ final ConsumerRecords<K, V> consumerRecords =
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ LOG.debug("Polled [{]} records from Kafka",
consumerRecords.count());
+ return consumerRecords;
+ }
+
+ private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ final Iterable<ConsumerRecord<K, V>> records =
consumerRecords.records(tp.topic()); // TODO Decide if want to give
flexibility to emmit/poll either per topic or per partition
+ for (ConsumerRecord<K, V> record : records) {
+ final Values tuple = tupleBuilder.buildTuple(record);
+ final org.apache.storm.kafka.spout.MessageId messageId =
new org.apache.storm.kafka.spout.MessageId(record);
// TODO don't create message for non acking mode. Should we support non
acking mode?
+ collector.emit(kafkaSpoutStream.getStreamId(), tuple,
messageId); // emits one tuple per record
+ emittedTuples.put(messageId, tuple);
+ LOG.info("HMCL - Emitted tuple for record {}", record);
+ }
+ }
+ }
+
+ private boolean retry() {
+ return failed.size() > 0;
+ }
+
+ private void retryFailedTuples() {
+ for (TopicPartition tp : failed.keySet()) {
+ for (org.apache.storm.kafka.spout.MessageId msgId :
failed.get(tp)) {
+ if (isInBlackList(msgId)) {
+ removeFromBlacklist(msgId);
+ removeFromFailed(tp, msgId);
+ } else {
+ final Values tuple = emittedTuples.get(msgId);
+ LOG.debug("Retrying tuple. [msgId={}, tuple={}]",
msgId, tuple);
+ collector.emit(kafkaSpoutStream.getStreamId(), tuple,
msgId);
+ }
+ }
+ }
+ }
+
+ // all the tuples that are in traffic when the rebalance occurs will
be added
+ // to black list to be disregarded when they are either acked or failed
+ private boolean isInBlackList(org.apache.storm.kafka.spout.MessageId
msgId) {
+ return blackList.contains(msgId);
+ }
+
+ private void
removeFromBlacklist(org.apache.storm.kafka.spout.MessageId msgId) {
+ blackList.remove(msgId);
+ }
+
+ // ======== Ack =======
+
+ @Override
+ public void ack(Object messageId) {
+ final org.apache.storm.kafka.spout.MessageId msgId =
(org.apache.storm.kafka.spout.MessageId) messageId;
+ final TopicPartition tp = msgId.getTopicPartition();
+
+ if (isInBlackList(msgId)) {
+ removeFromBlacklist(msgId);
+ } else {
+ addAckedTuples(tp, msgId);
+ // Removed acked tuples from the emittedTuples data structure
+ emittedTuples.remove(msgId);
+ // if this acked msg is a retry, remove it from failed data
structure
+ removeFromFailed(tp, msgId);
+ }
+ }
+
+ private void addAckedTuples(TopicPartition tp,
org.apache.storm.kafka.spout.MessageId msgId) {
+ // lock because ack and commit happen in different threads
+ ackCommitLock.lock();
+ try {
+ if (!acked.containsKey(tp)) {
+ acked.put(tp, new OffsetEntry(tp));
+ }
+ acked.get(tp).add(msgId);
+ } finally {
+ ackCommitLock.unlock();
+ }
+ }
+
+ // ======== Fail =======
+
+ @Override
+ public void fail(Object messageId) {
+ final org.apache.storm.kafka.spout.MessageId msgId =
(org.apache.storm.kafka.spout.MessageId) messageId;
+
+ if (isInBlackList(msgId)) {
+ removeFromBlacklist(msgId);
+ } else {
+ final TopicPartition tp = msgId.getTopicPartition();
+ // limit to max number of retries
+ if (msgId.numFails() >= maxRetries) {
+ LOG.debug("Reached the maximum number of retries. Adding
[{]} to list of messages to be committed to kafka", msgId);
+ ack(msgId);
+ removeFromFailed(tp, msgId);
+ } else {
+ addToFailed(tp, msgId);
--- End diff --
Why don't we want to just re-emit that failed tuple right here?
> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to
> reduce dependencies and use long term supported kafka apis
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Hugo Louro
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)