Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2704#discussion_r193437501
--- Diff: storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
---
@@ -40,94 +41,39 @@ public void add(TaskMessage taskMsg) {
}
public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo,
IConnection> connections) {
- HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>>
bundleMapByDestination = groupBundleByDestination(taskToNode);
-
- for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry
: bundleMapByDestination.entrySet()) {
+ HashMap<NodeInfo, Stream<TaskMessage>> bundleMapByDestination =
groupBundleByDestination(taskToNode);
+
+ for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry :
bundleMapByDestination.entrySet()) {
NodeInfo node = entry.getKey();
IConnection conn = connections.get(node);
if (conn != null) {
- ArrayList<ArrayList<TaskMessage>> bundle =
entry.getValue();
- Iterator<TaskMessage> iter = getBundleIterator(bundle);
- if (null != iter && iter.hasNext()) {
+ Iterator<TaskMessage> iter = entry.getValue().iterator();
+ if (iter.hasNext()) {
conn.send(iter);
}
- entry.getValue().clear();
} else {
LOG.warn("Connection not available for hostPort {}", node);
}
}
}
- private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>>
groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
- HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new
HashMap<>();
-
+ private HashMap<NodeInfo, Stream<TaskMessage>>
groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
+ HashMap<NodeInfo, Stream<TaskMessage>> result = new HashMap<>();
+
for (Entry<Integer, ArrayList<TaskMessage>> entry :
bundles.entrySet()) {
--- End diff --
nit: whitespace
---