Change TransferDrainer to re-group msg by destination when sending
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85af1950 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85af1950 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85af1950 Branch: refs/heads/master Commit: 85af195049fd1229acc62a1b8638415c06b6cf9d Parents: a1d7b3e Author: Jungtaek Lim <[email protected]> Authored: Thu May 14 06:21:32 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu May 14 06:21:32 2015 +0900 ---------------------------------------------------------------------- .../backtype/storm/utils/TransferDrainer.java | 53 +++++++++++++------- 1 file changed, 36 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/85af1950/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java index 20b4545..6130bfe 100644 --- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java +++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java @@ -23,34 +23,26 @@ import java.util.Iterator; import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; +import com.google.common.collect.Maps; public class TransferDrainer { private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) { - for (Integer key : taskTupleSetMap.keySet()) { - - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); - if (null == bundle) { - bundle = new ArrayList<ArrayList<TaskMessage>>(); - bundles.put(key, bundle); - } - - ArrayList tupleSet = taskTupleSetMap.get(key); - if (null != tupleSet && tupleSet.size() > 0) { - bundle.add(tupleSet); - } - } + for (Integer task : taskTupleSetMap.keySet()) { + addListRefToMap(this.bundles, task, taskTupleSetMap.get(task)); + } } public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) { - for (Integer task : bundles.keySet()) { - String hostPort = taskToNode.get(task); + HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode); + + for (String hostPort : bundleMapByDestination.keySet()) { if (hostPort != null) { IConnection connection = connections.get(hostPort); if (null != connection) { - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(task); + ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort); Iterator<TaskMessage> iter = getBundleIterator(bundle); if (null != iter && iter.hasNext()) { connection.send(iter); @@ -59,7 +51,34 @@ public class TransferDrainer { } } } - + + private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) { + HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap(); + for (Integer task : this.bundles.keySet()) { + String hostPort = taskToNode.get(task); + if (hostPort != null) { + for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) { + addListRefToMap(bundleMap, hostPort, chunk); + } + } + } + return bundleMap; + } + + private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles, + T key, ArrayList<TaskMessage> tuples) { + ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); + + if (null == bundle) { + bundle = new ArrayList<ArrayList<TaskMessage>>(); + bundles.put(key, bundle); + } + + if (null != tuples && tuples.size() > 0) { + bundle.add(tuples); + } + } + private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) { if (null == bundle) {
