Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch ed2e89117 -> 7e5e5f0e3


Merge branch 'STORM-737' of github.com:HeartSaVioR/storm

Conflicts:
        storm-core/src/clj/backtype/storm/daemon/worker.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/acc1a79d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/acc1a79d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/acc1a79d

Branch: refs/heads/0.10.x-branch
Commit: acc1a79de17802c59810193311aa182b963e8511
Parents: ed2e891
Author: P. Taylor Goetz <[email protected]>
Authored: Fri May 29 13:53:39 2015 -0400
Committer: P. Taylor Goetz <[email protected]>
Committed: Fri May 29 13:58:23 2015 -0400

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 13 ++--
 .../backtype/storm/utils/TransferDrainer.java   | 62 +++++++++++++-------
 2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/acc1a79d/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj 
b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index a83146b..bd5b9c1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -129,10 +129,10 @@
                   (.add local pair) 
 
                   ;;Using java objects directly to avoid performance issues in 
java code
-                  (let [node+port (get @task->node+port task)]
-                    (when (not (.get remoteMap node+port))
-                      (.put remoteMap node+port (ArrayList.)))
-                    (let [remote (.get remoteMap node+port)]
+                  (do
+                    (when (not (.get remoteMap task))
+                      (.put remoteMap task (ArrayList.)))
+                    (let [remote (.get remoteMap task)]
                       (if (not-nil? task)
                         (.add remote (TaskMessage. task (.serialize serializer 
tuple)))
                         (log-warn "Can't transfer tuple - task value is nil. 
tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
@@ -342,8 +342,9 @@
         
         (when batch-end?
           (read-locked endpoint-socket-lock
-            (let [node+port->socket @node+port->socket]
-              (.send drainer node+port->socket)))
+             (let [node+port->socket @node+port->socket
+                   task->node+port @task->node+port]
+               (.send drainer task->node+port node+port->socket)))
           (.clear drainer))))))
 
 ;; Check whether this messaging connection is ready to send data

http://git-wip-us.apache.org/repos/asf/storm/blob/acc1a79d/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java 
b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 0e53632..5a111ce 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -23,40 +23,60 @@ import java.util.Iterator;
 
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
+import com.google.common.collect.Maps;
 
 public class TransferDrainer {
 
-  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new 
HashMap();
+  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new 
HashMap();
   
-  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
-    for (String key : workerTupleSetMap.keySet()) {
-      
-      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
-      if (null == bundle) {
-        bundle = new ArrayList<ArrayList<TaskMessage>>();
-        bundles.put(key, bundle);
-      }
-      
-      ArrayList tupleSet = workerTupleSetMap.get(key);
-      if (null != tupleSet && tupleSet.size() > 0) {
-        bundle.add(tupleSet);
-      }
-    } 
+  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
+    for (Integer task : taskTupleSetMap.keySet()) {
+      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
+    }
   }
   
-  public void send(HashMap<String, IConnection> connections) {
-    for (String hostPort : bundles.keySet()) {
+  public void send(HashMap<Integer, String> taskToNode, HashMap<String, 
IConnection> connections) {
+    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination 
= groupBundleByDestination(taskToNode);
+
+    for (String hostPort : bundleMapByDestination.keySet()) {
       IConnection connection = connections.get(hostPort);
-      if (null != connection) { 
-        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+      if (null != connection) {
+        ArrayList<ArrayList<TaskMessage>> bundle = 
bundleMapByDestination.get(hostPort);
         Iterator<TaskMessage> iter = getBundleIterator(bundle);
         if (null != iter && iter.hasNext()) {
           connection.send(iter);
         }
       }
-    } 
+    }
   }
-  
+
+  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> 
groupBundleByDestination(HashMap<Integer, String> taskToNode) {
+    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = 
Maps.newHashMap();
+    for (Integer task : this.bundles.keySet()) {
+      String hostPort = taskToNode.get(task);
+      if (hostPort != null) {
+        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
+          addListRefToMap(bundleMap, hostPort, chunk);
+        }
+      }
+    }
+    return bundleMap;
+  }
+
+  private <T> void addListRefToMap(HashMap<T, 
ArrayList<ArrayList<TaskMessage>>> bundleMap,
+                                   T key, ArrayList<TaskMessage> tuples) {
+    ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);
+
+    if (null == bundle) {
+      bundle = new ArrayList<ArrayList<TaskMessage>>();
+      bundleMap.put(key, bundle);
+    }
+
+    if (null != tuples && tuples.size() > 0) {
+      bundle.add(tuples);
+    }
+  }
+
   private Iterator<TaskMessage> getBundleIterator(final 
ArrayList<ArrayList<TaskMessage>> bundle) {
     
     if (null == bundle) {

Reply via email to