Repository: storm
Updated Branches:
  refs/heads/master 3f50b72a2 -> 13c33f335


While sending tuple, check task->node+port with read lock

* we can ensure task->node+port is safe within read lock
** refer write lock inside of mk-refresh-connections
* Let TransferDrainer matches task to node+port
** So then we can still enjoy optimization of sending logic


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1d7b3eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1d7b3eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1d7b3eb

Branch: refs/heads/master
Commit: a1d7b3eb343f304565fe24fb7e0151bfbcb3824e
Parents: 6b52e0d
Author: Jungtaek Lim <[email protected]>
Authored: Tue Apr 14 07:16:37 2015 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Tue Apr 14 07:16:37 2015 +0900

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 15 +++++-----
 .../backtype/storm/utils/TransferDrainer.java   | 29 +++++++++++---------
 2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1d7b3eb/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj 
b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e0263d6..3e72b39 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -139,12 +139,12 @@
                   (.add local pair) 
 
                   ;;Using java objects directly to avoid performance issues in 
java code
-                  (let [node+port (get @task->node+port task)]
-                    (when (not (.get remoteMap node+port))
-                      (.put remoteMap node+port (ArrayList.)))
-                    (let [remote (.get remoteMap node+port)]
+                  (let []
+                    (when (not (.get remoteMap task))
+                      (.put remoteMap task (ArrayList.)))
+                    (let [remote (.get remoteMap task)]
                       (.add remote (TaskMessage. task (.serialize serializer 
tuple)))
-                     )))) 
+                    ))))
                 (local-transfer local)
                 (disruptor/publish transfer-queue remoteMap)
               ))]
@@ -350,8 +350,9 @@
         
         (when batch-end?
           (read-locked endpoint-socket-lock
-            (let [node+port->socket @node+port->socket]
-              (.send drainer node+port->socket)))
+            (let [node+port->socket @node+port->socket
+                  task->node+port @task->node+port]
+              (.send drainer task->node+port node+port->socket)))
           (.clear drainer))))))
 
 ;; Check whether this messaging connection is ready to send data

http://git-wip-us.apache.org/repos/asf/storm/blob/a1d7b3eb/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java 
b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 0e53632..20b4545 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -26,10 +26,10 @@ import backtype.storm.messaging.TaskMessage;
 
 public class TransferDrainer {
 
-  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new 
HashMap();
+  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new 
HashMap();
   
-  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
-    for (String key : workerTupleSetMap.keySet()) {
+  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
+    for (Integer key : taskTupleSetMap.keySet()) {
       
       ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
       if (null == bundle) {
@@ -37,24 +37,27 @@ public class TransferDrainer {
         bundles.put(key, bundle);
       }
       
-      ArrayList tupleSet = workerTupleSetMap.get(key);
+      ArrayList tupleSet = taskTupleSetMap.get(key);
       if (null != tupleSet && tupleSet.size() > 0) {
         bundle.add(tupleSet);
       }
     } 
   }
   
-  public void send(HashMap<String, IConnection> connections) {
-    for (String hostPort : bundles.keySet()) {
-      IConnection connection = connections.get(hostPort);
-      if (null != connection) { 
-        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
-        Iterator<TaskMessage> iter = getBundleIterator(bundle);
-        if (null != iter && iter.hasNext()) {
-          connection.send(iter);
+  public void send(HashMap<Integer, String> taskToNode, HashMap<String, 
IConnection> connections) {
+    for (Integer task : bundles.keySet()) {
+      String hostPort = taskToNode.get(task);
+      if (hostPort != null) {
+        IConnection connection = connections.get(hostPort);
+        if (null != connection) {
+          ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(task);
+          Iterator<TaskMessage> iter = getBundleIterator(bundle);
+          if (null != iter && iter.hasNext()) {
+            connection.send(iter);
+          }
         }
       }
-    } 
+    }
   }
   
   private Iterator<TaskMessage> getBundleIterator(final 
ArrayList<ArrayList<TaskMessage>> bundle) {

Reply via email to