Repository: storm
Updated Branches:
  refs/heads/1.x-branch fe56cfc5a -> b230decc6


STORM-3222: Fix KafkaSpout internals to use LinkedList instead of ArrayList

KafkaSpout internally maintains a waitingToEmit list per topic partition and 
keeps removing the first item to emit during each nextTuple. The implementation 
uses an ArrayList which results in un-necessary traversal and copy for each 
tuple.

Also I am not sure why the nextTuple only emits a single tuple wheres ideally 
it should emit whatever it can emit in a single nextTuple call which is more 
efficient. However the logic appears too complicated to refactor.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e0a01e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e0a01e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e0a01e5

Branch: refs/heads/1.x-branch
Commit: 2e0a01e5e4e36169ef2a45bf5dcd72792231ee07
Parents: fe56cfc
Author: Arun Mahadevan <[email protected]>
Authored: Wed Sep 12 15:36:24 2018 -0700
Committer: Arun Mahadevan <[email protected]>
Committed: Fri Sep 14 11:39:35 2018 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2e0a01e5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 805bbd2..9bb77b8 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -364,7 +365,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
         for (TopicPartition tp : consumerRecords.partitions()) {
-            waitingToEmit.put(tp, new 
ArrayList<>(consumerRecords.records(tp)));
+            waitingToEmit.put(tp, new 
LinkedList<>(consumerRecords.records(tp)));
         }
     }
 
@@ -551,7 +552,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     List<ConsumerRecord<K, V>> waitingToEmitForTp = 
waitingToEmit.get(tp);
                     if (waitingToEmitForTp != null) {
                         //Discard the pending records that are already 
committed
-                        List<ConsumerRecord<K, V>> filteredRecords = new 
ArrayList<>();
+                        List<ConsumerRecord<K, V>> filteredRecords = new 
LinkedList<>();
                         for (ConsumerRecord<K, V> record : waitingToEmitForTp) 
{
                             if (record.offset() >= committedOffset) {
                                 filteredRecords.add(record);

Reply via email to