Address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9184624 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9184624 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9184624 Branch: refs/heads/master Commit: f9184624f18094beb1bb7e9f60e5665225dfb0ee Parents: 9b74f2e Author: vesense <[email protected]> Authored: Wed Feb 17 13:13:34 2016 +0800 Committer: vesense <[email protected]> Committed: Wed Feb 17 13:22:41 2016 +0800 ---------------------------------------------------------------------- .../storm/scheduler/DefaultScheduler.java | 21 ++++++------ .../apache/storm/scheduler/EvenScheduler.java | 36 +++++--------------- .../src/jvm/org/apache/storm/utils/Utils.java | 2 +- 3 files changed, 20 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f9184624/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java index e9cd180..774e8fd 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java @@ -31,18 +31,19 @@ public class DefaultScheduler implements IScheduler { private static Set<WorkerSlot> badSlots(Map<WorkerSlot, List<ExecutorDetails>> existingSlots, int numExecutors, int numWorkers) { if (numWorkers != 0) { Map<Integer, Integer> distribution = Utils.integerDivided(numExecutors, numWorkers); - Set<WorkerSlot> _slots = new HashSet<WorkerSlot>(); + Set<WorkerSlot> slots = new HashSet<WorkerSlot>(); for (Entry<WorkerSlot, List<ExecutorDetails>> entry : existingSlots.entrySet()) { - Integer executorCount = distribution.get(entry.getValue().size()); - if (executorCount != null && executorCount > 0) { - _slots.add(entry.getKey()); + Integer executorCount = entry.getValue().size(); + Integer workerCount = distribution.get(executorCount); + if (workerCount != null && workerCount > 0) { + slots.add(entry.getKey()); executorCount--; - distribution.put(entry.getValue().size(), executorCount); + distribution.put(executorCount, workerCount); } } - for (WorkerSlot slot : _slots) { + for (WorkerSlot slot : slots) { existingSlots.remove(slot); } @@ -83,12 +84,12 @@ public class DefaultScheduler implements IScheduler { Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size()); - Set<WorkerSlot> badSlot = null; + Set<WorkerSlot> badSlots = null; if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) { - badSlot = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); + badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); } - if (badSlot != null) { - cluster.freeSlots(badSlot); + if (badSlots != null) { + cluster.freeSlots(badSlots); } Map<String, TopologyDetails> _topologies = new HashMap<String, TopologyDetails>(); http://git-wip-us.apache.org/repos/asf/storm/blob/f9184624/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java index a29d45f..2e8565b 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java @@ -52,7 +52,7 @@ public class EvenScheduler implements IScheduler { slots.add(slot); } - // sort by port + // sort by port: from small to large for (List<WorkerSlot> slots : slotGroups.values()) { Collections.sort(slots, new Comparator<WorkerSlot>() { @Override @@ -62,7 +62,7 @@ public class EvenScheduler implements IScheduler { }); } - // sort by count + // sort by available slots size: from large to small List<List<WorkerSlot>> list = new ArrayList<List<WorkerSlot>>(slotGroups.values()); Collections.sort(list, new Comparator<List<WorkerSlot>>() { @Override @@ -84,18 +84,7 @@ public class EvenScheduler implements IScheduler { executorToSlot = existingAssignment.getExecutorToSlot(); } - Map<WorkerSlot, List<ExecutorDetails>> result = new HashMap<WorkerSlot, List<ExecutorDetails>>(); - if (executorToSlot != null) { - for (Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) { - List<ExecutorDetails> list = result.get(entry.getValue()); - if (list == null) { - list = new ArrayList<ExecutorDetails>(); - result.put(entry.getValue(), list); - } - list.add(entry.getKey()); - } - } - return result; + return Utils.reverseMap(executorToSlot); } private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) { @@ -105,7 +94,7 @@ public class EvenScheduler implements IScheduler { int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size()); List<WorkerSlot> sortedList = sortSlots(availableSlots, cluster); - if (sortedList == null) { + if (sortedList == null || sortedList.size() < (totalSlotsToUse - aliveAssigned.size())) { LOG.error("Available slots are not enough for topology: {}", topology.getName()); return new HashMap<ExecutorDetails, WorkerSlot>(); } @@ -122,25 +111,16 @@ public class EvenScheduler implements IScheduler { return reassignment; } - List<ExecutorDetails> _executors = new ArrayList<ExecutorDetails>(reassignExecutors); - Collections.sort(_executors, new Comparator<ExecutorDetails>() { + List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors); + Collections.sort(executors, new Comparator<ExecutorDetails>() { @Override public int compare(ExecutorDetails o1, ExecutorDetails o2) { return o1.getStartTask() - o2.getStartTask(); } }); - int numExecutors = _executors.size(); - List<WorkerSlot> _slots = new ArrayList<WorkerSlot>(numExecutors); - int numSlots = reassignSlots.size(); - for (int i = 0; i < numExecutors; i++) { - _slots.add(reassignSlots.get(i % numSlots)); - } - - Iterator<WorkerSlot> slotIterator = _slots.iterator(); - Iterator<ExecutorDetails> executorIterator = _executors.iterator(); - while (slotIterator.hasNext() && executorIterator.hasNext()) { - reassignment.put(executorIterator.next(), slotIterator.next()); + for (int i = 0; i < executors.size(); i++) { + reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size())); } if (reassignment.size() != 0) { http://git-wip-us.apache.org/repos/asf/storm/blob/f9184624/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index b1f59f2..ae3d387 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -2254,7 +2254,7 @@ public class Utils { List<T> first = new ArrayList<T>(); List<List<T>> rest = new ArrayList<List<T>>(); for (List<T> node : nodeList) { - if (null != node && node.size() > 0) { + if (node != null && node.size() > 0) { first.add(node.get(0)); rest.add(node.subList(1, node.size())); }
