Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2704#discussion_r193437501
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java 
---
    @@ -40,94 +41,39 @@ public void add(TaskMessage taskMsg) {
         }
     
         public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, 
IConnection> connections) {
    -        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> 
bundleMapByDestination = groupBundleByDestination(taskToNode);
    -
    -        for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry 
: bundleMapByDestination.entrySet()) {
    +        HashMap<NodeInfo, Stream<TaskMessage>> bundleMapByDestination = 
groupBundleByDestination(taskToNode);
    +        
    +        for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry : 
bundleMapByDestination.entrySet()) {
                 NodeInfo node = entry.getKey();
                 IConnection conn = connections.get(node);
                 if (conn != null) {
    -                ArrayList<ArrayList<TaskMessage>> bundle = 
entry.getValue();
    -                Iterator<TaskMessage> iter = getBundleIterator(bundle);
    -                if (null != iter && iter.hasNext()) {
    +                Iterator<TaskMessage> iter = entry.getValue().iterator();
    +                if (iter.hasNext()) {
                         conn.send(iter);
                     }
    -                entry.getValue().clear();
                 } else {
                     LOG.warn("Connection not available for hostPort {}", node);
                 }
             }
         }
     
    -    private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> 
groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
    -        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new 
HashMap<>();
    -
    +    private HashMap<NodeInfo, Stream<TaskMessage>> 
groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
    +        HashMap<NodeInfo, Stream<TaskMessage>> result = new HashMap<>();
    +        
             for (Entry<Integer, ArrayList<TaskMessage>> entry : 
bundles.entrySet()) {
    --- End diff --
    
    nit: whitespace


---

Reply via email to