Address review comments

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9184624
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9184624
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9184624

Branch: refs/heads/master
Commit: f9184624f18094beb1bb7e9f60e5665225dfb0ee
Parents: 9b74f2e
Author: vesense <[email protected]>
Authored: Wed Feb 17 13:13:34 2016 +0800
Committer: vesense <[email protected]>
Committed: Wed Feb 17 13:22:41 2016 +0800

----------------------------------------------------------------------
 .../storm/scheduler/DefaultScheduler.java       | 21 ++++++------
 .../apache/storm/scheduler/EvenScheduler.java   | 36 +++++---------------
 .../src/jvm/org/apache/storm/utils/Utils.java   |  2 +-
 3 files changed, 20 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f9184624/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java 
b/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java
index e9cd180..774e8fd 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java
@@ -31,18 +31,19 @@ public class DefaultScheduler implements IScheduler {
     private static Set<WorkerSlot> badSlots(Map<WorkerSlot, 
List<ExecutorDetails>> existingSlots, int numExecutors, int numWorkers) {
         if (numWorkers != 0) {
             Map<Integer, Integer> distribution = 
Utils.integerDivided(numExecutors, numWorkers);
-            Set<WorkerSlot> _slots = new HashSet<WorkerSlot>();
+            Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
 
             for (Entry<WorkerSlot, List<ExecutorDetails>> entry : 
existingSlots.entrySet()) {
-                Integer executorCount = 
distribution.get(entry.getValue().size());
-                if (executorCount != null && executorCount > 0) {
-                    _slots.add(entry.getKey());
+                Integer executorCount = entry.getValue().size();
+                Integer workerCount = distribution.get(executorCount);
+                if (workerCount != null && workerCount > 0) {
+                    slots.add(entry.getKey());
                     executorCount--;
-                    distribution.put(entry.getValue().size(), executorCount);
+                    distribution.put(executorCount, workerCount);
                 }
             }
 
-            for (WorkerSlot slot : _slots) {
+            for (WorkerSlot slot : slots) {
                 existingSlots.remove(slot);                
             }
 
@@ -83,12 +84,12 @@ public class DefaultScheduler implements IScheduler {
             Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, 
aliveAssigned.keySet());
             int totalSlotsToUse = Math.min(topology.getNumWorkers(), 
canReassignSlots.size() + availableSlots.size());
 
-            Set<WorkerSlot> badSlot = null;
+            Set<WorkerSlot> badSlots = null;
             if (totalSlotsToUse > aliveAssigned.size() || 
!allExecutors.equals(aliveExecutors)) {
-                badSlot = badSlots(aliveAssigned, allExecutors.size(), 
totalSlotsToUse);                
+                badSlots = badSlots(aliveAssigned, allExecutors.size(), 
totalSlotsToUse);                
             }
-            if (badSlot != null) {
-                cluster.freeSlots(badSlot);                
+            if (badSlots != null) {
+                cluster.freeSlots(badSlots);                
             }
 
             Map<String, TopologyDetails> _topologies = new HashMap<String, 
TopologyDetails>();

http://git-wip-us.apache.org/repos/asf/storm/blob/f9184624/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java 
b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
index a29d45f..2e8565b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
@@ -52,7 +52,7 @@ public class EvenScheduler implements IScheduler {
                 slots.add(slot);
             }
 
-            // sort by port
+            // sort by port: from small to large
             for (List<WorkerSlot> slots : slotGroups.values()) {
                 Collections.sort(slots, new Comparator<WorkerSlot>() {
                     @Override
@@ -62,7 +62,7 @@ public class EvenScheduler implements IScheduler {
                 });
             }
 
-            // sort by count
+            // sort by available slots size: from large to small
             List<List<WorkerSlot>> list = new 
ArrayList<List<WorkerSlot>>(slotGroups.values());
             Collections.sort(list, new Comparator<List<WorkerSlot>>() {
                 @Override
@@ -84,18 +84,7 @@ public class EvenScheduler implements IScheduler {
             executorToSlot = existingAssignment.getExecutorToSlot();
         }
 
-        Map<WorkerSlot, List<ExecutorDetails>> result = new 
HashMap<WorkerSlot, List<ExecutorDetails>>();
-        if (executorToSlot != null) {
-            for (Entry<ExecutorDetails, WorkerSlot> entry : 
executorToSlot.entrySet()) {
-                List<ExecutorDetails> list = result.get(entry.getValue());
-                if (list == null) {
-                    list = new ArrayList<ExecutorDetails>();
-                    result.put(entry.getValue(), list);
-                }
-                list.add(entry.getKey());
-            }
-        }
-        return result;
+        return Utils.reverseMap(executorToSlot);
     }
 
     private static Map<ExecutorDetails, WorkerSlot> 
scheduleTopology(TopologyDetails topology, Cluster cluster) {
@@ -105,7 +94,7 @@ public class EvenScheduler implements IScheduler {
         int totalSlotsToUse = Math.min(topology.getNumWorkers(), 
availableSlots.size() + aliveAssigned.size());
 
         List<WorkerSlot> sortedList = sortSlots(availableSlots, cluster);
-        if (sortedList == null) {
+        if (sortedList == null || sortedList.size() < (totalSlotsToUse - 
aliveAssigned.size())) {
             LOG.error("Available slots are not enough for topology: {}", 
topology.getName());
             return new HashMap<ExecutorDetails, WorkerSlot>();
         }
@@ -122,25 +111,16 @@ public class EvenScheduler implements IScheduler {
             return reassignment;
         }
 
-        List<ExecutorDetails> _executors = new 
ArrayList<ExecutorDetails>(reassignExecutors);
-        Collections.sort(_executors, new Comparator<ExecutorDetails>() {
+        List<ExecutorDetails> executors = new 
ArrayList<ExecutorDetails>(reassignExecutors);
+        Collections.sort(executors, new Comparator<ExecutorDetails>() {
             @Override
             public int compare(ExecutorDetails o1, ExecutorDetails o2) {
                 return o1.getStartTask() - o2.getStartTask();
             }
         });
 
-        int numExecutors = _executors.size();
-        List<WorkerSlot> _slots = new ArrayList<WorkerSlot>(numExecutors);
-        int numSlots = reassignSlots.size();
-        for (int i = 0; i < numExecutors; i++) {
-            _slots.add(reassignSlots.get(i % numSlots));
-        }
-
-        Iterator<WorkerSlot> slotIterator = _slots.iterator();
-        Iterator<ExecutorDetails> executorIterator = _executors.iterator();
-        while (slotIterator.hasNext() && executorIterator.hasNext()) {
-            reassignment.put(executorIterator.next(), slotIterator.next());
+        for (int i = 0; i < executors.size(); i++) {
+            reassignment.put(executors.get(i), reassignSlots.get(i % 
reassignSlots.size()));
         }
 
         if (reassignment.size() != 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f9184624/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index b1f59f2..ae3d387 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2254,7 +2254,7 @@ public class Utils {
             List<T> first = new ArrayList<T>();
             List<List<T>> rest = new ArrayList<List<T>>();
             for (List<T> node : nodeList) {
-                if (null != node && node.size() > 0) {
+                if (node != null && node.size() > 0) {
                   first.add(node.get(0));
                   rest.add(node.subList(1, node.size()));
                 }

Reply via email to