Repository: storm
Updated Branches:
  refs/heads/master f5a410ba3 -> 0f89cd8ab


STORM-3222: Fix KafkaSpout internals to use LinkedList instead of ArrayList

KafkaSpout internally maintains a waitingToEmit list per topic partition and 
keeps removing the first item to emit during each nextTuple. The implementation 
uses an ArrayList which results in un-necessary traversal and copy for each 
tuple.

Also I am not sure why the nextTuple only emits a single tuple wheres ideally 
it should emit whatever it can emit in a single nextTuple call which is more 
efficient. However the logic appears too complicated to refactor.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5384aa8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5384aa8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5384aa8

Branch: refs/heads/master
Commit: a5384aa845496a7e584bd947cace968a18b7ffdf
Parents: f3c1437
Author: Arun Mahadevan <[email protected]>
Authored: Wed Sep 12 15:36:24 2018 -0700
Committer: Arun Mahadevan <[email protected]>
Committed: Wed Sep 12 15:37:52 2018 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a5384aa8/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d6befd5..1ee0a5c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -350,7 +351,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
         for (TopicPartition tp : consumerRecords.partitions()) {
-            waitingToEmit.put(tp, new 
ArrayList<>(consumerRecords.records(tp)));
+            waitingToEmit.put(tp, new 
LinkedList<>(consumerRecords.records(tp)));
         }
     }
 
@@ -536,7 +537,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                         //Discard the pending records that are already 
committed
                         waitingToEmit.put(tp, waitingToEmitForTp.stream()
                             .filter(record -> record.offset() >= 
committedOffset)
-                            .collect(Collectors.toList()));
+                            
.collect(Collectors.toCollection(LinkedList::new)));
                     }
                 }
 

Reply via email to