Apply addressed comments from review
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85c5096e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85c5096e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85c5096e Branch: refs/heads/master Commit: 85c5096e0806ac872739d8b73574a8f1e5d31679 Parents: 52bd47b Author: Jungtaek Lim <[email protected]> Authored: Thu May 21 06:30:23 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu May 21 06:30:23 2015 +0900 ---------------------------------------------------------------------- .../src/clj/backtype/storm/daemon/worker.clj | 4 ++-- .../backtype/storm/utils/TransferDrainer.java | 20 +++++++++----------- 2 files changed, 11 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/85c5096e/storm-core/src/clj/backtype/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 6da8692..fe64474 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -129,12 +129,12 @@ (.add local pair) ;;Using java objects directly to avoid performance issues in java code - (let [] + (do (when (not (.get remoteMap task)) (.put remoteMap task (ArrayList.))) (let [remote (.get remoteMap task)] (.add remote (TaskMessage. task (.serialize serializer tuple))) - )))) + )))) (local-transfer local) (disruptor/publish transfer-queue remoteMap) ))] http://git-wip-us.apache.org/repos/asf/storm/blob/85c5096e/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java index 6130bfe..5a111ce 100644 --- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java +++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java @@ -39,14 +39,12 @@ public class TransferDrainer { HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode); for (String hostPort : bundleMapByDestination.keySet()) { - if (hostPort != null) { - IConnection connection = connections.get(hostPort); - if (null != connection) { - ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort); - Iterator<TaskMessage> iter = getBundleIterator(bundle); - if (null != iter && iter.hasNext()) { - connection.send(iter); - } + IConnection connection = connections.get(hostPort); + if (null != connection) { + ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort); + Iterator<TaskMessage> iter = getBundleIterator(bundle); + if (null != iter && iter.hasNext()) { + connection.send(iter); } } } @@ -65,13 +63,13 @@ public class TransferDrainer { return bundleMap; } - private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles, + private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundleMap, T key, ArrayList<TaskMessage> tuples) { - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); + ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key); if (null == bundle) { bundle = new ArrayList<ArrayList<TaskMessage>>(); - bundles.put(key, bundle); + bundleMap.put(key, bundle); } if (null != tuples && tuples.size() > 0) {
