Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2704#discussion_r193437501 --- Diff: storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java --- @@ -40,94 +41,39 @@ public void add(TaskMessage taskMsg) { } public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) { - HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode); - - for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) { + HashMap<NodeInfo, Stream<TaskMessage>> bundleMapByDestination = groupBundleByDestination(taskToNode); + + for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry : bundleMapByDestination.entrySet()) { NodeInfo node = entry.getKey(); IConnection conn = connections.get(node); if (conn != null) { - ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue(); - Iterator<TaskMessage> iter = getBundleIterator(bundle); - if (null != iter && iter.hasNext()) { + Iterator<TaskMessage> iter = entry.getValue().iterator(); + if (iter.hasNext()) { conn.send(iter); } - entry.getValue().clear(); } else { LOG.warn("Connection not available for hostPort {}", node); } } } - private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) { - HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new HashMap<>(); - + private HashMap<NodeInfo, Stream<TaskMessage>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) { + HashMap<NodeInfo, Stream<TaskMessage>> result = new HashMap<>(); + for (Entry<Integer, ArrayList<TaskMessage>> entry : bundles.entrySet()) { --- End diff -- nit: whitespace
---