Repository: storm Updated Branches: refs/heads/1.x-branch fe56cfc5a -> b230decc6
STORM-3222: Fix KafkaSpout internals to use LinkedList instead of ArrayList KafkaSpout internally maintains a waitingToEmit list per topic partition and keeps removing the first item to emit during each nextTuple. The implementation uses an ArrayList which results in un-necessary traversal and copy for each tuple. Also I am not sure why the nextTuple only emits a single tuple wheres ideally it should emit whatever it can emit in a single nextTuple call which is more efficient. However the logic appears too complicated to refactor. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e0a01e5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e0a01e5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e0a01e5 Branch: refs/heads/1.x-branch Commit: 2e0a01e5e4e36169ef2a45bf5dcd72792231ee07 Parents: fe56cfc Author: Arun Mahadevan <[email protected]> Authored: Wed Sep 12 15:36:24 2018 -0700 Committer: Arun Mahadevan <[email protected]> Committed: Fri Sep 14 11:39:35 2018 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2e0a01e5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 805bbd2..9bb77b8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -364,7 +365,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) { for (TopicPartition tp : consumerRecords.partitions()) { - waitingToEmit.put(tp, new ArrayList<>(consumerRecords.records(tp))); + waitingToEmit.put(tp, new LinkedList<>(consumerRecords.records(tp))); } } @@ -551,7 +552,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp); if (waitingToEmitForTp != null) { //Discard the pending records that are already committed - List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>(); + List<ConsumerRecord<K, V>> filteredRecords = new LinkedList<>(); for (ConsumerRecord<K, V> record : waitingToEmitForTp) { if (record.offset() >= committedOffset) { filteredRecords.add(record);
