Repository: storm Updated Branches: refs/heads/master f5a410ba3 -> 0f89cd8ab
STORM-3222: Fix KafkaSpout internals to use LinkedList instead of ArrayList KafkaSpout internally maintains a waitingToEmit list per topic partition and keeps removing the first item to emit during each nextTuple. The implementation uses an ArrayList which results in un-necessary traversal and copy for each tuple. Also I am not sure why the nextTuple only emits a single tuple wheres ideally it should emit whatever it can emit in a single nextTuple call which is more efficient. However the logic appears too complicated to refactor. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5384aa8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5384aa8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5384aa8 Branch: refs/heads/master Commit: a5384aa845496a7e584bd947cace968a18b7ffdf Parents: f3c1437 Author: Arun Mahadevan <[email protected]> Authored: Wed Sep 12 15:36:24 2018 -0700 Committer: Arun Mahadevan <[email protected]> Committed: Wed Sep 12 15:37:52 2018 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a5384aa8/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index d6befd5..1ee0a5c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -350,7 +351,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) { for (TopicPartition tp : consumerRecords.partitions()) { - waitingToEmit.put(tp, new ArrayList<>(consumerRecords.records(tp))); + waitingToEmit.put(tp, new LinkedList<>(consumerRecords.records(tp))); } } @@ -536,7 +537,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { //Discard the pending records that are already committed waitingToEmit.put(tp, waitingToEmitForTp.stream() .filter(record -> record.offset() >= committedOffset) - .collect(Collectors.toList())); + .collect(Collectors.toCollection(LinkedList::new))); } }
