fix nimbus test failure Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/defcb960 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/defcb960 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/defcb960
Branch: refs/heads/master Commit: defcb9601d8f4d287fa4f1d6de7ee43d8183b137 Parents: 72d409c Author: Xin Wang <[email protected]> Authored: Sun Feb 21 17:20:09 2016 +0800 Committer: Xin Wang <[email protected]> Committed: Sun Feb 21 17:20:09 2016 +0800 ---------------------------------------------------------------------- .../apache/storm/scheduler/EvenScheduler.java | 29 +++++++++++------ .../clj/org/apache/storm/scheduler_test.clj | 34 +++++--------------- 2 files changed, 28 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/defcb960/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java index 2e8565b..d91e187 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java @@ -22,10 +22,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -38,16 +36,29 @@ import com.google.common.collect.Sets; public class EvenScheduler implements IScheduler { private static final Logger LOG = LoggerFactory.getLogger(EvenScheduler.class); - public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots, Cluster cluster) { + public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots) { + //For example, we have a three nodes(supervisor1, supervisor2, supervisor3) cluster: + //slots before sort: + //supervisor1:6700, supervisor1:6701, + //supervisor2:6700, supervisor2:6701, supervisor2:6702, + //supervisor3:6700, supervisor3:6703, supervisor3:6702, supervisor3:6701 + //slots after sort: + //supervisor3:6700, supervisor2:6700, supervisor1:6700, + //supervisor3:6701, supervisor2:6701, supervisor1:6701, + //supervisor3:6702, supervisor2:6702, + //supervisor3:6703 + if (availableSlots != null && availableSlots.size() > 0) { // group by node Map<String, List<WorkerSlot>> slotGroups = new TreeMap<String, List<WorkerSlot>>(); for (WorkerSlot slot : availableSlots) { - String host = cluster.getHost(slot.getNodeId()); - List<WorkerSlot> slots = slotGroups.get(host); - if (slots == null) { - slots = new ArrayList<WorkerSlot>(); - slotGroups.put(host, slots); + String node = slot.getNodeId(); + List<WorkerSlot> slots = null; + if(slotGroups.containsKey(node)){ + slots = slotGroups.get(node); + }else{ + slots = new ArrayList<WorkerSlot>(); + slotGroups.put(node, slots); } slots.add(slot); } @@ -93,7 +104,7 @@ public class EvenScheduler implements IScheduler { Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size()); - List<WorkerSlot> sortedList = sortSlots(availableSlots, cluster); + List<WorkerSlot> sortedList = sortSlots(availableSlots); if (sortedList == null || sortedList.size() < (totalSlotsToUse - aliveAssigned.size())) { LOG.error("Available slots are not enough for topology: {}", topology.getName()); return new HashMap<ExecutorDetails, WorkerSlot>(); http://git-wip-us.apache.org/repos/asf/storm/blob/defcb960/storm-core/test/clj/org/apache/storm/scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler_test.clj index b14af71..0d74daf 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj @@ -261,34 +261,16 @@ )) (deftest test-sort-slots - (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 6700 6701))) - supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 6700 6701 6702))) - supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 6700 6701 6702 6703))) - assignment1 (SchedulerAssignmentImpl. "topology1" nil) - assignment2 (SchedulerAssignmentImpl. "topology2" nil) - supervisor1-slot0 (WorkerSlot. "supervisor1" 6700) - supervisor1-slot1 (WorkerSlot. "supervisor1" 6701) - supervisor2-slot0 (WorkerSlot. "supervisor2" 6700) - supervisor2-slot1 (WorkerSlot. "supervisor2" 6701) - supervisor2-slot2 (WorkerSlot. "supervisor2" 6702) - supervisor3-slot0 (WorkerSlot. "supervisor3" 6700) - supervisor3-slot1 (WorkerSlot. "supervisor3" 6701) - supervisor3-slot2 (WorkerSlot. "supervisor3" 6702) - supervisor3-slot3 (WorkerSlot. "supervisor3" 6703) - cluster (Cluster. (nimbus/standalone-nimbus) - {"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3} - {"topology1" assignment1 "topology2" assignment2} - nil)] ;; test supervisor2 has more free slots (is (= "[supervisor2:6700, supervisor1:6700, supervisor2:6701, supervisor1:6701, supervisor2:6702]" - (.toString (EvenScheduler/sortSlots [supervisor1-slot0 supervisor1-slot1 - supervisor2-slot0 supervisor2-slot1 supervisor2-slot2 - ] cluster)))) + (.toString (EvenScheduler/sortSlots [(WorkerSlot. "supervisor1" 6700) (WorkerSlot. "supervisor1" 6701) + (WorkerSlot. "supervisor2" 6700) (WorkerSlot. "supervisor2" 6701) (WorkerSlot. "supervisor2" 6702) + ])))) ;; test supervisor3 has more free slots (is (= "[supervisor3:6700, supervisor2:6700, supervisor1:6700, supervisor3:6701, supervisor2:6701, supervisor1:6701, supervisor3:6702, supervisor2:6702, supervisor3:6703]" - (.toString (EvenScheduler/sortSlots [supervisor1-slot0 supervisor1-slot1 - supervisor2-slot0 supervisor2-slot1 supervisor2-slot2 - supervisor3-slot0 supervisor3-slot3 supervisor3-slot2 supervisor3-slot1 - ] cluster)))) - )) + (.toString (EvenScheduler/sortSlots [(WorkerSlot. "supervisor1" 6700) (WorkerSlot. "supervisor1" 6701) + (WorkerSlot. "supervisor2" 6700) (WorkerSlot. "supervisor2" 6701) (WorkerSlot. "supervisor2" 6702) + (WorkerSlot. "supervisor3" 6700) (WorkerSlot. "supervisor3" 6703) (WorkerSlot. "supervisor3" 6702) (WorkerSlot. "supervisor3" 6701) + ])))) + )
