Repository: storm Updated Branches: refs/heads/master 3f50b72a2 -> 13c33f335
While sending tuple, check task->node+port with read lock * we can ensure task->node+port is safe within read lock ** refer write lock inside of mk-refresh-connections * Let TransferDrainer matches task to node+port ** So then we can still enjoy optimization of sending logic Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1d7b3eb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1d7b3eb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1d7b3eb Branch: refs/heads/master Commit: a1d7b3eb343f304565fe24fb7e0151bfbcb3824e Parents: 6b52e0d Author: Jungtaek Lim <[email protected]> Authored: Tue Apr 14 07:16:37 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Tue Apr 14 07:16:37 2015 +0900 ---------------------------------------------------------------------- .../src/clj/backtype/storm/daemon/worker.clj | 15 +++++----- .../backtype/storm/utils/TransferDrainer.java | 29 +++++++++++--------- 2 files changed, 24 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a1d7b3eb/storm-core/src/clj/backtype/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index e0263d6..3e72b39 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -139,12 +139,12 @@ (.add local pair) ;;Using java objects directly to avoid performance issues in java code - (let [node+port (get @task->node+port task)] - (when (not (.get remoteMap node+port)) - (.put remoteMap node+port (ArrayList.))) - (let [remote (.get remoteMap node+port)] + (let [] + (when (not (.get remoteMap task)) + (.put remoteMap task (ArrayList.))) + (let [remote (.get remoteMap task)] (.add remote (TaskMessage. task (.serialize serializer tuple))) - )))) + )))) (local-transfer local) (disruptor/publish transfer-queue remoteMap) ))] @@ -350,8 +350,9 @@ (when batch-end? (read-locked endpoint-socket-lock - (let [node+port->socket @node+port->socket] - (.send drainer node+port->socket))) + (let [node+port->socket @node+port->socket + task->node+port @task->node+port] + (.send drainer task->node+port node+port->socket))) (.clear drainer)))))) ;; Check whether this messaging connection is ready to send data http://git-wip-us.apache.org/repos/asf/storm/blob/a1d7b3eb/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java index 0e53632..20b4545 100644 --- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java +++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java @@ -26,10 +26,10 @@ import backtype.storm.messaging.TaskMessage; public class TransferDrainer { - private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); + private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); - public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) { - for (String key : workerTupleSetMap.keySet()) { + public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) { + for (Integer key : taskTupleSetMap.keySet()) { ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); if (null == bundle) { @@ -37,24 +37,27 @@ public class TransferDrainer { bundles.put(key, bundle); } - ArrayList tupleSet = workerTupleSetMap.get(key); + ArrayList tupleSet = taskTupleSetMap.get(key); if (null != tupleSet && tupleSet.size() > 0) { bundle.add(tupleSet); } } } - public void send(HashMap<String, IConnection> connections) { - for (String hostPort : bundles.keySet()) { - IConnection connection = connections.get(hostPort); - if (null != connection) { - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort); - Iterator<TaskMessage> iter = getBundleIterator(bundle); - if (null != iter && iter.hasNext()) { - connection.send(iter); + public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) { + for (Integer task : bundles.keySet()) { + String hostPort = taskToNode.get(task); + if (hostPort != null) { + IConnection connection = connections.get(hostPort); + if (null != connection) { + ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(task); + Iterator<TaskMessage> iter = getBundleIterator(bundle); + if (null != iter && iter.hasNext()) { + connection.send(iter); + } } } - } + } } private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
