Repository: storm
Updated Branches:
  refs/heads/master 89a349eb8 -> da7969ea9


[STORM-1300] port backtype.storm.scheduler.resource-aware-scheduler-test to 
java.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1b93de1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1b93de1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1b93de1

Branch: refs/heads/master
Commit: c1b93de1650d113df0e1d0493780d9915bf3dacc
Parents: c2cf3be
Author: zhuol <[email protected]>
Authored: Fri Mar 18 16:06:00 2016 -0500
Committer: zhuol <[email protected]>
Committed: Fri Mar 18 16:06:00 2016 -0500

----------------------------------------------------------------------
 .../resource/TestResourceAwareScheduler.java    | 683 ++++++++++++++++++-
 .../TestUtilsForResourceAwareScheduler.java     |  73 +-
 2 files changed, 754 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 78c73a1..e0336ea 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.storm.scheduler.resource;
 
 import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
@@ -29,11 +30,14 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
 
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +48,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collection;
+import java.util.Collections;
 
 public class TestResourceAwareScheduler {
 
@@ -54,6 +61,680 @@ public class TestResourceAwareScheduler {
 
     private static int currentTime = 1450418597;
 
+    private static final Config defaultTopologyConf = new Config();
+
+
+    @BeforeClass
+    public static void initConf() {
+        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, 
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+        
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 
10.0);
+        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
128.0);
+        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
+        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 
8192.0);
+        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
+        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+    }
+
+    @Test
+    public void testRASNodeSlotAssign() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+        Topologies topologies = new Topologies(new HashMap<String, 
TopologyDetails>());
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), new HashMap());
+        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, 
topologies);
+        Assert.assertEquals(5, nodes.size());
+        RAS_Node node = nodes.get("sup-0");
+
+        Assert.assertEquals("sup-0", node.getId());
+        Assert.assertTrue(node.isAlive());
+        Assert.assertEquals(0, node.getRunningTopologies().size());
+        Assert.assertTrue(node.isTotallyFree());
+        Assert.assertEquals(4, node.totalSlotsFree());
+        Assert.assertEquals(0, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0);
+
+        List<ExecutorDetails> executors11 = new ArrayList<>();
+        executors11.add(new ExecutorDetails(1, 1));
+        node.assign(node.getFreeSlots().iterator().next(), topology1, 
executors11);
+        Assert.assertEquals(1, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(3, node.totalSlotsFree());
+        Assert.assertEquals(1, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        List<ExecutorDetails> executors12 = new ArrayList<>();
+        executors12.add(new ExecutorDetails(2, 2));
+        node.assign(node.getFreeSlots().iterator().next(), topology1, 
executors12);
+        Assert.assertEquals(1, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(2, node.totalSlotsFree());
+        Assert.assertEquals(2, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0);
+
+        List<ExecutorDetails> executors21 = new ArrayList<>();
+        executors21.add(new ExecutorDetails(1, 1));
+        node.assign(node.getFreeSlots().iterator().next(), topology2, 
executors21);
+        Assert.assertEquals(2, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(1, node.totalSlotsFree());
+        Assert.assertEquals(3, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        List<ExecutorDetails> executors22 = new ArrayList<>();
+        executors22.add(new ExecutorDetails(2, 2));
+        node.assign(node.getFreeSlots().iterator().next(), topology2, 
executors22);
+        Assert.assertEquals(2, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(0, node.totalSlotsFree());
+        Assert.assertEquals(4, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        node.freeAllSlots();
+        Assert.assertEquals(0, node.getRunningTopologies().size());
+        Assert.assertTrue(node.isTotallyFree());
+        Assert.assertEquals(4, node.totalSlotsFree());
+        Assert.assertEquals(0, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+    }
+
+    @Test
+    public void sanityTestOfScheduling() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0);
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment = 
cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots = assignment.getSlots();
+        Set<String> nodesIDs = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots) {
+            nodesIDs.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors = assignment.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots.size());
+        Assert.assertEquals(1, nodesIDs.size());
+        Assert.assertEquals(2, executors.size());
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+    }
+
+    @Test
+    public void testTopologyWithMultipleSpouts() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with 
multiple spouts
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
+        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
+        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 
1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
+        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt1");
+        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt1");
+        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt2");
+        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 
1).shuffleGrouping("wordSpout2");
+        StormTopology stormTopology1 = builder1.createTopology();
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+
+        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with 
two unconnected partitions
+        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
+        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topoMap.put(topology2.getId(), topology2);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+        Set<String> nodesIDs1 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots1) {
+            nodesIDs1.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots1.size());
+        Assert.assertEquals(1, nodesIDs1.size());
+        Assert.assertEquals(7, executors1.size());
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+
+        SchedulerAssignment assignment2 = 
cluster.getAssignmentById(topology2.getId());
+        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
+        Set<String> nodesIDs2 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots2) {
+            nodesIDs2.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots2.size());
+        Assert.assertEquals(1, nodesIDs2.size());
+        Assert.assertEquals(2, executors2.size());
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+    }
+
+    @Test
+    public void testTopologySetCpuAndMemLoad() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with 
multiple spouts
+        builder1.setSpout("wordSpout", new TestWordSpout(), 
1).setCPULoad(20.0).setMemoryLoad(200.0);
+        builder1.setBolt("wordCountBolt", new TestWordCounter(), 
1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
+        StormTopology stormTopology1 = builder1.createTopology();
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+        double assignedMemory = 0.0;
+        double assignedCpu = 0.0;
+        Set<String> nodesIDs1 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots1) {
+            nodesIDs1.add(slot.getNodeId());
+            assignedMemory += slot.getAllocatedMemOnHeap() + 
slot.getAllocatedMemOffHeap();
+            assignedCpu += slot.getAllocatedCpu();
+
+        }
+        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots1.size());
+        Assert.assertEquals(1, nodesIDs1.size());
+        Assert.assertEquals(2, executors1.size());
+        Assert.assertEquals(400.0, assignedMemory, 0.001);
+        Assert.assertEquals(40.0, assignedCpu, 0.001);
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+    }
+
+    @Test
+    public void testResourceLimitation() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with 
multiple spouts
+        builder1.setSpout("wordSpout", new TestWordSpout(), 
2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
+        builder1.setBolt("wordCountBolt", new TestWordCounter(), 
1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
+        StormTopology stormTopology1 = builder1.createTopology();
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+        Set<String> nodesIDs1 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots1) {
+            nodesIDs1.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+        List<Double> assignedExecutorMemory = new ArrayList<>();
+        List<Double> assignedExecutorCpu = new ArrayList<>();
+        for (ExecutorDetails executor : executors1) {
+            assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor));
+            assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor));
+        }
+        Collections.sort(assignedExecutorCpu);
+        Collections.sort(assignedExecutorMemory);
+
+        Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new 
HashMap<>();
+        Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = 
new HashMap<>();
+        Map<Double, Double> cpuAvailableToUsed = new HashMap();
+        Map<Double, Double> memoryAvailableToUsed = new HashMap();
+
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
assignment1.getExecutorToSlot().entrySet()) {
+            executorToSupervisor.put(entry.getKey(), 
cluster.getSupervisorById(entry.getValue().getNodeId()));
+        }
+        for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : 
executorToSupervisor.entrySet()) {
+            List<ExecutorDetails> executorsOnSupervisor = 
supervisorToExecutors.get(entry.getValue());
+            if (executorsOnSupervisor == null) {
+                executorsOnSupervisor = new ArrayList<>();
+                supervisorToExecutors.put(entry.getValue(), 
executorsOnSupervisor);
+            }
+            executorsOnSupervisor.add(entry.getKey());
+        }
+        for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : 
supervisorToExecutors.entrySet()) {
+            Double supervisorTotalCpu = entry.getKey().getTotalCPU();
+            Double supervisorTotalMemory = entry.getKey().getTotalMemory();
+            Double supervisorUsedCpu = 0.0;
+            Double supervisorUsedMemory = 0.0;
+            for (ExecutorDetails executor: entry.getValue()) {
+                supervisorUsedMemory += topology1.getTotalCpuReqTask(executor);
+                supervisorTotalCpu += topology1.getTotalMemReqTask(executor);
+            }
+            cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu);
+            memoryAvailableToUsed.put(supervisorTotalMemory, 
supervisorUsedMemory);
+        }
+        // executor0 resides one one worker (on one), executor1 and executor2 
on another worker (on the other node)
+        Assert.assertEquals(2, assignedSlots1.size());
+        Assert.assertEquals(2, nodesIDs1.size());
+        Assert.assertEquals(3, executors1.size());
+
+        Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001);
+        Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001);
+        Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001);
+        Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001);
+        Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001);
+        Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001);
+
+        for (Map.Entry<Double, Double> entry : 
memoryAvailableToUsed.entrySet()) {
+            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+        }
+        for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) {
+            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+        }
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+    }
+
+    @Test
+    public void testScheduleResilience() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 3);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0);
+
+        TopologyBuilder builder2 = new TopologyBuilder();
+        builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Config config2 = new Config();
+        config2.putAll(defaultTopologyConf);
+        // memory requirement is large enough so that two executors can not be 
fully assigned to one node
+        config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
1280.0);
+        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0);
+
+        // Test1: When a worker fails, RAS does not alter existing assignments 
on healthy workers
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology2.getId(), topology2);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignmentImpl assignment = 
(SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId());
+        // pick a worker to mock as failed
+        WorkerSlot failedWorker = new 
ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
+        Map<ExecutorDetails, WorkerSlot> executorToSlot = 
assignment.getExecutorToSlot();
+        List<ExecutorDetails> failedExecutors = new ArrayList<>();
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
executorToSlot.entrySet()) {
+            if (entry.getValue().equals(failedWorker)) {
+                failedExecutors.add(entry.getKey());
+            }
+        }
+        for (ExecutorDetails executor : failedExecutors) {
+            executorToSlot.remove(executor); // remove executor details 
assigned to the failed worker
+        }
+        Map<ExecutorDetails, WorkerSlot> copyOfOldMapping = new 
HashMap<>(executorToSlot);
+        Set<ExecutorDetails> healthyExecutors = copyOfOldMapping.keySet();
+
+        rs.schedule(topologies, cluster);
+        SchedulerAssignment newAssignment = 
cluster.getAssignmentById(topology2.getId());
+        Map<ExecutorDetails, WorkerSlot> newExecutorToSlot = 
newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : healthyExecutors) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), 
newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+        // end of Test1
+
+        // Test2: When a supervisor fails, RAS does not alter existing 
assignments
+        executorToSlot = new HashMap<>();
+        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 
0));
+        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 
1));
+        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 
1));
+        Map<String, SchedulerAssignmentImpl> existingAssignments = new 
HashMap<>();
+        assignment = new SchedulerAssignmentImpl(topology1.getId(), 
executorToSlot);
+        existingAssignments.put(topology1.getId(), assignment);
+        copyOfOldMapping = new HashMap<>(executorToSlot);
+        Set<ExecutorDetails> existingExecutors = copyOfOldMapping.keySet();
+        Map<String, SupervisorDetails> supMap1 = new HashMap<>(supMap);
+        supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed 
supervisor
+        Cluster cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, 
config1);
+
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+
+        newAssignment = cluster1.getAssignmentById(topology1.getId());
+        newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : existingExecutors) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), 
newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Fully Scheduled", 
cluster1.getStatusMap().get(topology1.getId()));
+        // end of Test2
+
+        // Test3: When a supervisor and a worker on it fails, RAS does not 
alter existing assignments
+        executorToSlot = new HashMap<>();
+        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 
1)); // the worker to orphan
+        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 
2)); // the worker that fails
+        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 
1)); // the healthy worker
+        existingAssignments = new HashMap<>();
+        assignment = new SchedulerAssignmentImpl(topology1.getId(), 
executorToSlot);
+        existingAssignments.put(topology1.getId(), assignment);
+        // delete one worker of sup-0 (failed) from topo1 assignment to enable 
actual schedule for testing
+        executorToSlot.remove(new ExecutorDetails(1, 1));
+
+        copyOfOldMapping = new HashMap<>(executorToSlot);
+        existingExecutors = copyOfOldMapping.keySet(); // namely the two eds 
on the orphaned worker and the healthy worker
+        supMap1 = new HashMap<>(supMap);
+        supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed 
supervisor
+        cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1);
+
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+
+        newAssignment = cluster1.getAssignmentById(topology1.getId());
+        newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : existingExecutors) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), 
newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Fully Scheduled", 
cluster1.getStatusMap().get(topology1.getId()));
+        // end of Test3
+
+        // Test4: Scheduling a new topology does not disturb other assignments 
unnecessarily
+        cluster1 = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+        assignment = 
(SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+        executorToSlot = assignment.getExecutorToSlot();
+        copyOfOldMapping = new HashMap<>(executorToSlot);
+
+        topoMap.put(topology2.getId(), topology2);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+
+        newAssignment = 
(SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+        newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : copyOfOldMapping.keySet()) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), 
newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology1.getId()));
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology2.getId()));
+    }
+
+    @Test
+    public void testHeterogeneousCluster() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap1 = new HashMap<>(); // strong 
supervisor node
+        resourceMap1.put(Config.SUPERVISOR_CPU_CAPACITY, 800.0);
+        resourceMap1.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 4096.0);
+        Map<String, Number> resourceMap2 = new HashMap<>(); // weak supervisor 
node
+        resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+        resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
+
+        Map<String, SupervisorDetails> supMap = new HashMap<String, 
SupervisorDetails>();
+        for (int i = 0; i < 2; i++) {
+            List<Number> ports = new LinkedList<Number>();
+            for (int j = 0; j < 4; j++) {
+                ports.add(j);
+            }
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" 
+ i, null, ports, (Map)(i == 0 ? resourceMap1 : resourceMap2));
+            supMap.put(sup.getId(), sup);
+        }
+
+        // topo1 has one single huge task that can not be handled by the 
small-super
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 
1).setCPULoad(300.0).setMemoryLoad(2000.0, 48.0);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+
+        // topo2 has 4 large tasks
+        TopologyBuilder builder2 = new TopologyBuilder();
+        builder2.setSpout("wordSpout2", new TestWordSpout(), 
4).setCPULoad(100.0).setMemoryLoad(500.0, 12.0);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Config config2 = new Config();
+        config2.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 4, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+
+        // topo3 has 4 large tasks
+        TopologyBuilder builder3 = new TopologyBuilder();
+        builder3.setSpout("wordSpout3", new TestWordSpout(), 
4).setCPULoad(20.0).setMemoryLoad(200.0, 56.0);
+        StormTopology stormTopology3 = builder3.createTopology();
+        Config config3 = new Config();
+        config3.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap3 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3, 4, 0);
+        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0);
+
+        // topo4 has 12 small tasks, whose mem usage does not exactly divide a 
node's mem capacity
+        TopologyBuilder builder4 = new TopologyBuilder();
+        builder4.setSpout("wordSpout4", new TestWordSpout(), 
12).setCPULoad(30.0).setMemoryLoad(100.0, 0.0);
+        StormTopology stormTopology4 = builder4.createTopology();
+        Config config4 = new Config();
+        config4.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap4 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4, 12, 0);
+        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0);
+
+        // topo5 has 40 small tasks, it should be able to exactly use up both 
the cpu and mem in the cluster
+        TopologyBuilder builder5 = new TopologyBuilder();
+        builder5.setSpout("wordSpout5", new TestWordSpout(), 
40).setCPULoad(25.0).setMemoryLoad(100.0, 28.0);
+        StormTopology stormTopology5 = builder5.createTopology();
+        Config config5 = new Config();
+        config5.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap5 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5, 40, 0);
+        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0);
+
+        // Test1: Launch topo 1-3 together, it should be able to use up either 
mem or cpu resource due to exact division
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topoMap.put(topology2.getId(), topology2);
+        topoMap.put(topology3.getId(), topology3);
+        Topologies topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
+
+        Map<SupervisorDetails, Double> superToCpu = 
TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
+        Map<SupervisorDetails, Double> superToMem = 
TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, 
topologies);
+
+        final Double EPSILON = 0.0001;
+        for (SupervisorDetails supervisor : supMap.values()) {
+            Double cpuAvailable = supervisor.getTotalCPU();
+            Double memAvailable = supervisor.getTotalMemory();
+            Double cpuUsed = superToCpu.get(supervisor);
+            Double memUsed = superToMem.get(supervisor);
+            Assert.assertTrue((Math.abs(memAvailable - memUsed) < EPSILON) || 
(Math.abs(cpuAvailable - cpuUsed) < EPSILON));
+        }
+        // end of Test1
+
+        // Test2: Launch topo 1, 2 and 4, they together request a little more 
mem than available, so one of the 3 topos will not be scheduled
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topoMap.put(topology2.getId(), topology2);
+        topoMap.put(topology4.getId(), topology4);
+        topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+        int numTopologiesAssigned = 0;
+        if (cluster.getStatusMap().get(topology1.getId()).equals("Running - 
Fully Scheduled by DefaultResourceAwareStrategy")) {
+            numTopologiesAssigned++;
+        }
+        if (cluster.getStatusMap().get(topology2.getId()).equals("Running - 
Fully Scheduled by DefaultResourceAwareStrategy")) {
+            numTopologiesAssigned++;
+        }
+        if (cluster.getStatusMap().get(topology4.getId()).equals("Running - 
Fully Scheduled by DefaultResourceAwareStrategy")) {
+            numTopologiesAssigned++;
+        }
+        Assert.assertEquals(2, numTopologiesAssigned);
+        //end of Test2
+
+        //Test3: "Launch topo5 only, both mem and cpu should be exactly used 
up"
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
+        topoMap = new HashMap<>();
+        topoMap.put(topology5.getId(), topology5);
+        topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+        superToCpu = 
TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
+        superToMem = 
TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, 
topologies);
+        for (SupervisorDetails supervisor : supMap.values()) {
+            Double cpuAvailable = supervisor.getTotalCPU();
+            Double memAvailable = supervisor.getTotalMemory();
+            Double cpuUsed = superToCpu.get(supervisor);
+            Double memUsed = superToMem.get(supervisor);
+            Assert.assertEquals(cpuAvailable, cpuUsed, 0.0001);
+            Assert.assertEquals(memAvailable, memUsed, 0.0001);
+        }
+        //end of Test3
+    }
+
+    @Test
+    public void testTopologyWorkerMaxHeapSize() {
+        // Test1: If RAS spreads executors across multiple workers based on 
the set limit for a worker used by the topology
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 4, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+        Assert.assertEquals(4, cluster.getAssignedNumWorkers(topology1));
+
+        // Test2: test when no more workers are available due to topology 
worker max heap size limit but there is memory is still available
+        // wordSpout2 is going to contain 5 executors that needs scheduling. 
Each of those executors has a memory requirement of 128.0 MB
+        // The cluster contains 4 free WorkerSlots. For this topolology each 
worker is limited to a max heap size of 128.0
+        // Thus, one executor not going to be able to get scheduled thus 
failing the scheduling of this topology and no executors of this topology will 
be scheduleded
+        TopologyBuilder builder2 = new TopologyBuilder();
+        builder2.setSpout("wordSpout2", new TestWordSpout(), 5);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Config config2 = new Config();
+        config2.putAll(defaultTopologyConf);
+        config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 5, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config2);
+        topoMap = new HashMap<>();
+        topoMap.put(topology2.getId(), topology2);
+        topologies = new Topologies(topoMap);
+        rs.prepare(config2);
+        rs.schedule(topologies, cluster);
+        Assert.assertEquals("Not enough resources to schedule - 0/5 executors 
scheduled", cluster.getStatusMap().get(topology2.getId()));
+        Assert.assertEquals(5, 
cluster.getUnassignedExecutors(topology2).size());
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
+        // Topology will not be able to be successfully scheduled: Config 
TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
+        // Largest memory requirement of a component in the topology).
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
129.0);
+        StormSubmitter.submitTopologyWithProgressBar("test", config1, 
stormTopology1);
+    }
+
     @Test
     public void TestReadInResourceAwareSchedulerUserPools() {
         Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", 
false);

http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index f21645b..7cd21ce 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -29,6 +29,8 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -50,6 +52,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -109,7 +112,7 @@ public class TestUtilsForResourceAwareScheduler {
     public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology 
topology, int spoutParallelism, int boltParallelism) {
         Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, 
String>();
         int startTask = 0;
-        int endTask = 1;
+        int endTask = 0;
         for (Map.Entry<String, SpoutSpec> entry : 
topology.get_spouts().entrySet()) {
             for (int i = 0; i < spoutParallelism; i++) {
                 retMap.put(new ExecutorDetails(startTask, endTask), 
entry.getKey());
@@ -285,4 +288,72 @@ public class TestUtilsForResourceAwareScheduler {
         }
         return ret;
     }
+
+    public static Map<SupervisorDetails, Double> 
getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
+        Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+        Collection<SchedulerAssignment> assignments = 
cluster.getAssignments().values();
+        Collection<SupervisorDetails> supervisors = 
cluster.getSupervisors().values();
+        for (SupervisorDetails supervisor : supervisors) {
+            superToMem.put(supervisor, 0.0);
+        }
+
+        for (SchedulerAssignment assignment : assignments) {
+            Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new 
HashMap<>();
+            Map<SupervisorDetails, List<ExecutorDetails>> 
supervisorToExecutors = new HashMap<>();
+            TopologyDetails topology = 
topologies.getById(assignment.getTopologyId());
+            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
assignment.getExecutorToSlot().entrySet()) {
+                executorToSupervisor.put(entry.getKey(), 
cluster.getSupervisorById(entry.getValue().getNodeId()));
+            }
+            for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : 
executorToSupervisor.entrySet()) {
+                List<ExecutorDetails> executorsOnSupervisor = 
supervisorToExecutors.get(entry.getValue());
+                if (executorsOnSupervisor == null) {
+                    executorsOnSupervisor = new ArrayList<>();
+                    supervisorToExecutors.put(entry.getValue(), 
executorsOnSupervisor);
+                }
+                executorsOnSupervisor.add(entry.getKey());
+            }
+            for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : 
supervisorToExecutors.entrySet()) {
+                Double supervisorUsedMemory = 0.0;
+                for (ExecutorDetails executor: entry.getValue()) {
+                    supervisorUsedMemory += 
topology.getTotalMemReqTask(executor);
+                }
+                superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) 
+ supervisorUsedMemory);
+            }
+        }
+        return superToMem;
+    }
+
+    public static Map<SupervisorDetails, Double> 
getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
+        Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+        Collection<SchedulerAssignment> assignments = 
cluster.getAssignments().values();
+        Collection<SupervisorDetails> supervisors = 
cluster.getSupervisors().values();
+        for (SupervisorDetails supervisor : supervisors) {
+            superToCpu.put(supervisor, 0.0);
+        }
+
+        for (SchedulerAssignment assignment : assignments) {
+            Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new 
HashMap<>();
+            Map<SupervisorDetails, List<ExecutorDetails>> 
supervisorToExecutors = new HashMap<>();
+            TopologyDetails topology = 
topologies.getById(assignment.getTopologyId());
+            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
assignment.getExecutorToSlot().entrySet()) {
+                executorToSupervisor.put(entry.getKey(), 
cluster.getSupervisorById(entry.getValue().getNodeId()));
+            }
+            for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : 
executorToSupervisor.entrySet()) {
+                List<ExecutorDetails> executorsOnSupervisor = 
supervisorToExecutors.get(entry.getValue());
+                if (executorsOnSupervisor == null) {
+                    executorsOnSupervisor = new ArrayList<>();
+                    supervisorToExecutors.put(entry.getValue(), 
executorsOnSupervisor);
+                }
+                executorsOnSupervisor.add(entry.getKey());
+            }
+            for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : 
supervisorToExecutors.entrySet()) {
+                Double supervisorUsedCpu = 0.0;
+                for (ExecutorDetails executor: entry.getValue()) {
+                    supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+                }
+                superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) 
+ supervisorUsedCpu);
+            }
+        }
+        return superToCpu;
+    }
 }

Reply via email to