Repository: storm Updated Branches: refs/heads/master 89a349eb8 -> da7969ea9
[STORM-1300] port backtype.storm.scheduler.resource-aware-scheduler-test to java. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1b93de1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1b93de1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1b93de1 Branch: refs/heads/master Commit: c1b93de1650d113df0e1d0493780d9915bf3dacc Parents: c2cf3be Author: zhuol <[email protected]> Authored: Fri Mar 18 16:06:00 2016 -0500 Committer: zhuol <[email protected]> Committed: Fri Mar 18 16:06:00 2016 -0500 ---------------------------------------------------------------------- .../resource/TestResourceAwareScheduler.java | 683 ++++++++++++++++++- .../TestUtilsForResourceAwareScheduler.java | 73 +- 2 files changed, 754 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index 78c73a1..e0336ea 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -19,6 +19,7 @@ package org.apache.storm.scheduler.resource; import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.ExecutorDetails; @@ -29,11 +30,14 @@ import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import org.apache.storm.validation.ConfigValidation; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +48,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Collection; +import java.util.Collections; public class TestResourceAwareScheduler { @@ -54,6 +61,680 @@ public class TestResourceAwareScheduler { private static int currentTime = 1450418597; + private static final Config defaultTopologyConf = new Config(); + + + @BeforeClass + public static void initConf() { + defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + + defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); + defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); + defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); + defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); + } + + @Test + public void testRASNodeSlotAssign() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap); + Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>()); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap()); + Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies); + Assert.assertEquals(5, nodes.size()); + RAS_Node node = nodes.get("sup-0"); + + Assert.assertEquals("sup-0", node.getId()); + Assert.assertTrue(node.isAlive()); + Assert.assertEquals(0, node.getRunningTopologies().size()); + Assert.assertTrue(node.isTotallyFree()); + Assert.assertEquals(4, node.totalSlotsFree()); + Assert.assertEquals(0, node.totalSlotsUsed()); + Assert.assertEquals(4, node.totalSlots()); + + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + + List<ExecutorDetails> executors11 = new ArrayList<>(); + executors11.add(new ExecutorDetails(1, 1)); + node.assign(node.getFreeSlots().iterator().next(), topology1, executors11); + Assert.assertEquals(1, node.getRunningTopologies().size()); + Assert.assertFalse(node.isTotallyFree()); + Assert.assertEquals(3, node.totalSlotsFree()); + Assert.assertEquals(1, node.totalSlotsUsed()); + Assert.assertEquals(4, node.totalSlots()); + + List<ExecutorDetails> executors12 = new ArrayList<>(); + executors12.add(new ExecutorDetails(2, 2)); + node.assign(node.getFreeSlots().iterator().next(), topology1, executors12); + Assert.assertEquals(1, node.getRunningTopologies().size()); + Assert.assertFalse(node.isTotallyFree()); + Assert.assertEquals(2, node.totalSlotsFree()); + Assert.assertEquals(2, node.totalSlotsUsed()); + Assert.assertEquals(4, node.totalSlots()); + + TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + + List<ExecutorDetails> executors21 = new ArrayList<>(); + executors21.add(new ExecutorDetails(1, 1)); + node.assign(node.getFreeSlots().iterator().next(), topology2, executors21); + Assert.assertEquals(2, node.getRunningTopologies().size()); + Assert.assertFalse(node.isTotallyFree()); + Assert.assertEquals(1, node.totalSlotsFree()); + Assert.assertEquals(3, node.totalSlotsUsed()); + Assert.assertEquals(4, node.totalSlots()); + + List<ExecutorDetails> executors22 = new ArrayList<>(); + executors22.add(new ExecutorDetails(2, 2)); + node.assign(node.getFreeSlots().iterator().next(), topology2, executors22); + Assert.assertEquals(2, node.getRunningTopologies().size()); + Assert.assertFalse(node.isTotallyFree()); + Assert.assertEquals(0, node.totalSlotsFree()); + Assert.assertEquals(4, node.totalSlotsUsed()); + Assert.assertEquals(4, node.totalSlots()); + + node.freeAllSlots(); + Assert.assertEquals(0, node.getRunningTopologies().size()); + Assert.assertTrue(node.isTotallyFree()); + Assert.assertEquals(4, node.totalSlotsFree()); + Assert.assertEquals(0, node.totalSlotsUsed()); + Assert.assertEquals(4, node.totalSlots()); + } + + @Test + public void sanityTestOfScheduling() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap); + + Config config = new Config(); + config.putAll(defaultTopologyConf); + + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0); + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + Topologies topologies = new Topologies(topoMap); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId()); + Set<WorkerSlot> assignedSlots = assignment.getSlots(); + Set<String> nodesIDs = new HashSet<>(); + for (WorkerSlot slot : assignedSlots) { + nodesIDs.add(slot.getNodeId()); + } + Collection<ExecutorDetails> executors = assignment.getExecutors(); + + Assert.assertEquals(1, assignedSlots.size()); + Assert.assertEquals(1, nodesIDs.size()); + Assert.assertEquals(2, executors.size()); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId())); + } + + @Test + public void testTopologyWithMultipleSpouts() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap); + + TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts + builder1.setSpout("wordSpout1", new TestWordSpout(), 1); + builder1.setSpout("wordSpout2", new TestWordSpout(), 1); + builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2"); + builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1"); + builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1"); + builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2"); + builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2"); + StormTopology stormTopology1 = builder1.createTopology(); + + Config config = new Config(); + config.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0); + + TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions + builder2.setSpout("wordSpoutX", new TestWordSpout(), 1); + builder2.setSpout("wordSpoutY", new TestWordSpout(), 1); + StormTopology stormTopology2 = builder2.createTopology(); + Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0); + + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + topoMap.put(topology2.getId(), topology2); + Topologies topologies = new Topologies(topoMap); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId()); + Set<WorkerSlot> assignedSlots1 = assignment1.getSlots(); + Set<String> nodesIDs1 = new HashSet<>(); + for (WorkerSlot slot : assignedSlots1) { + nodesIDs1.add(slot.getNodeId()); + } + Collection<ExecutorDetails> executors1 = assignment1.getExecutors(); + + Assert.assertEquals(1, assignedSlots1.size()); + Assert.assertEquals(1, nodesIDs1.size()); + Assert.assertEquals(7, executors1.size()); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId())); + + SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId()); + Set<WorkerSlot> assignedSlots2 = assignment2.getSlots(); + Set<String> nodesIDs2 = new HashSet<>(); + for (WorkerSlot slot : assignedSlots2) { + nodesIDs2.add(slot.getNodeId()); + } + Collection<ExecutorDetails> executors2 = assignment2.getExecutors(); + + Assert.assertEquals(1, assignedSlots2.size()); + Assert.assertEquals(1, nodesIDs2.size()); + Assert.assertEquals(2, executors2.size()); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId())); + } + + @Test + public void testTopologySetCpuAndMemLoad() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap); + + TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts + builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0); + builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0); + StormTopology stormTopology1 = builder1.createTopology(); + + Config config = new Config(); + config.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0); + + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + Topologies topologies = new Topologies(topoMap); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId()); + Set<WorkerSlot> assignedSlots1 = assignment1.getSlots(); + double assignedMemory = 0.0; + double assignedCpu = 0.0; + Set<String> nodesIDs1 = new HashSet<>(); + for (WorkerSlot slot : assignedSlots1) { + nodesIDs1.add(slot.getNodeId()); + assignedMemory += slot.getAllocatedMemOnHeap() + slot.getAllocatedMemOffHeap(); + assignedCpu += slot.getAllocatedCpu(); + + } + Collection<ExecutorDetails> executors1 = assignment1.getExecutors(); + + Assert.assertEquals(1, assignedSlots1.size()); + Assert.assertEquals(1, nodesIDs1.size()); + Assert.assertEquals(2, executors1.size()); + Assert.assertEquals(400.0, assignedMemory, 0.001); + Assert.assertEquals(40.0, assignedCpu, 0.001); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId())); + } + + @Test + public void testResourceLimitation() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap); + + TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts + builder1.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0); + builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0); + StormTopology stormTopology1 = builder1.createTopology(); + + Config config = new Config(); + config.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0); + + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + Topologies topologies = new Topologies(topoMap); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId()); + Set<WorkerSlot> assignedSlots1 = assignment1.getSlots(); + Set<String> nodesIDs1 = new HashSet<>(); + for (WorkerSlot slot : assignedSlots1) { + nodesIDs1.add(slot.getNodeId()); + } + Collection<ExecutorDetails> executors1 = assignment1.getExecutors(); + List<Double> assignedExecutorMemory = new ArrayList<>(); + List<Double> assignedExecutorCpu = new ArrayList<>(); + for (ExecutorDetails executor : executors1) { + assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor)); + assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor)); + } + Collections.sort(assignedExecutorCpu); + Collections.sort(assignedExecutorMemory); + + Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>(); + Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>(); + Map<Double, Double> cpuAvailableToUsed = new HashMap(); + Map<Double, Double> memoryAvailableToUsed = new HashMap(); + + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment1.getExecutorToSlot().entrySet()) { + executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId())); + } + for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) { + List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue()); + if (executorsOnSupervisor == null) { + executorsOnSupervisor = new ArrayList<>(); + supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor); + } + executorsOnSupervisor.add(entry.getKey()); + } + for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { + Double supervisorTotalCpu = entry.getKey().getTotalCPU(); + Double supervisorTotalMemory = entry.getKey().getTotalMemory(); + Double supervisorUsedCpu = 0.0; + Double supervisorUsedMemory = 0.0; + for (ExecutorDetails executor: entry.getValue()) { + supervisorUsedMemory += topology1.getTotalCpuReqTask(executor); + supervisorTotalCpu += topology1.getTotalMemReqTask(executor); + } + cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu); + memoryAvailableToUsed.put(supervisorTotalMemory, supervisorUsedMemory); + } + // executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node) + Assert.assertEquals(2, assignedSlots1.size()); + Assert.assertEquals(2, nodesIDs1.size()); + Assert.assertEquals(3, executors1.size()); + + Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001); + Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001); + Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001); + Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001); + Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001); + Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001); + + for (Map.Entry<Double, Double> entry : memoryAvailableToUsed.entrySet()) { + Assert.assertTrue(entry.getKey()- entry.getValue() >= 0); + } + for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) { + Assert.assertTrue(entry.getKey()- entry.getValue() >= 0); + } + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId())); + } + + @Test + public void testScheduleResilience() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap); + + TopologyBuilder builder1 = new TopologyBuilder(); + builder1.setSpout("wordSpout1", new TestWordSpout(), 3); + StormTopology stormTopology1 = builder1.createTopology(); + Config config1 = new Config(); + config1.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0); + + TopologyBuilder builder2 = new TopologyBuilder(); + builder2.setSpout("wordSpout2", new TestWordSpout(), 2); + StormTopology stormTopology2 = builder2.createTopology(); + Config config2 = new Config(); + config2.putAll(defaultTopologyConf); + // memory requirement is large enough so that two executors can not be fully assigned to one node + config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 1280.0); + Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0); + + // Test1: When a worker fails, RAS does not alter existing assignments on healthy workers + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology2.getId(), topology2); + Topologies topologies = new Topologies(topoMap); + + rs.prepare(config1); + rs.schedule(topologies, cluster); + + SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId()); + // pick a worker to mock as failed + WorkerSlot failedWorker = new ArrayList<WorkerSlot>(assignment.getSlots()).get(0); + Map<ExecutorDetails, WorkerSlot> executorToSlot = assignment.getExecutorToSlot(); + List<ExecutorDetails> failedExecutors = new ArrayList<>(); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) { + if (entry.getValue().equals(failedWorker)) { + failedExecutors.add(entry.getKey()); + } + } + for (ExecutorDetails executor : failedExecutors) { + executorToSlot.remove(executor); // remove executor details assigned to the failed worker + } + Map<ExecutorDetails, WorkerSlot> copyOfOldMapping = new HashMap<>(executorToSlot); + Set<ExecutorDetails> healthyExecutors = copyOfOldMapping.keySet(); + + rs.schedule(topologies, cluster); + SchedulerAssignment newAssignment = cluster.getAssignmentById(topology2.getId()); + Map<ExecutorDetails, WorkerSlot> newExecutorToSlot = newAssignment.getExecutorToSlot(); + + for (ExecutorDetails executor : healthyExecutors) { + Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor)); + } + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId())); + // end of Test1 + + // Test2: When a supervisor fails, RAS does not alter existing assignments + executorToSlot = new HashMap<>(); + executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 0)); + executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 1)); + executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1)); + Map<String, SchedulerAssignmentImpl> existingAssignments = new HashMap<>(); + assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot); + existingAssignments.put(topology1.getId(), assignment); + copyOfOldMapping = new HashMap<>(executorToSlot); + Set<ExecutorDetails> existingExecutors = copyOfOldMapping.keySet(); + Map<String, SupervisorDetails> supMap1 = new HashMap<>(supMap); + supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor + Cluster cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1); + + topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster1); + + newAssignment = cluster1.getAssignmentById(topology1.getId()); + newExecutorToSlot = newAssignment.getExecutorToSlot(); + + for (ExecutorDetails executor : existingExecutors) { + Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor)); + } + Assert.assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId())); + // end of Test2 + + // Test3: When a supervisor and a worker on it fails, RAS does not alter existing assignments + executorToSlot = new HashMap<>(); + executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 1)); // the worker to orphan + executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 2)); // the worker that fails + executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1)); // the healthy worker + existingAssignments = new HashMap<>(); + assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot); + existingAssignments.put(topology1.getId(), assignment); + // delete one worker of sup-0 (failed) from topo1 assignment to enable actual schedule for testing + executorToSlot.remove(new ExecutorDetails(1, 1)); + + copyOfOldMapping = new HashMap<>(executorToSlot); + existingExecutors = copyOfOldMapping.keySet(); // namely the two eds on the orphaned worker and the healthy worker + supMap1 = new HashMap<>(supMap); + supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor + cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1); + + topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster1); + + newAssignment = cluster1.getAssignmentById(topology1.getId()); + newExecutorToSlot = newAssignment.getExecutorToSlot(); + + for (ExecutorDetails executor : existingExecutors) { + Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor)); + } + Assert.assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId())); + // end of Test3 + + // Test4: Scheduling a new topology does not disturb other assignments unnecessarily + cluster1 = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); + topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster1); + assignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId()); + executorToSlot = assignment.getExecutorToSlot(); + copyOfOldMapping = new HashMap<>(executorToSlot); + + topoMap.put(topology2.getId(), topology2); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster1); + + newAssignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId()); + newExecutorToSlot = newAssignment.getExecutorToSlot(); + + for (ExecutorDetails executor : copyOfOldMapping.keySet()) { + Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor)); + } + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology1.getId())); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology2.getId())); + } + + @Test + public void testHeterogeneousCluster() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap1 = new HashMap<>(); // strong supervisor node + resourceMap1.put(Config.SUPERVISOR_CPU_CAPACITY, 800.0); + resourceMap1.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 4096.0); + Map<String, Number> resourceMap2 = new HashMap<>(); // weak supervisor node + resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0); + resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0); + + Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>(); + for (int i = 0; i < 2; i++) { + List<Number> ports = new LinkedList<Number>(); + for (int j = 0; j < 4; j++) { + ports.add(j); + } + SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, (Map)(i == 0 ? resourceMap1 : resourceMap2)); + supMap.put(sup.getId(), sup); + } + + // topo1 has one single huge task that can not be handled by the small-super + TopologyBuilder builder1 = new TopologyBuilder(); + builder1.setSpout("wordSpout1", new TestWordSpout(), 1).setCPULoad(300.0).setMemoryLoad(2000.0, 48.0); + StormTopology stormTopology1 = builder1.createTopology(); + Config config1 = new Config(); + config1.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0); + + // topo2 has 4 large tasks + TopologyBuilder builder2 = new TopologyBuilder(); + builder2.setSpout("wordSpout2", new TestWordSpout(), 4).setCPULoad(100.0).setMemoryLoad(500.0, 12.0); + StormTopology stormTopology2 = builder2.createTopology(); + Config config2 = new Config(); + config2.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 4, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0); + + // topo3 has 4 large tasks + TopologyBuilder builder3 = new TopologyBuilder(); + builder3.setSpout("wordSpout3", new TestWordSpout(), 4).setCPULoad(20.0).setMemoryLoad(200.0, 56.0); + StormTopology stormTopology3 = builder3.createTopology(); + Config config3 = new Config(); + config3.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap3 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3, 4, 0); + TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0); + + // topo4 has 12 small tasks, whose mem usage does not exactly divide a node's mem capacity + TopologyBuilder builder4 = new TopologyBuilder(); + builder4.setSpout("wordSpout4", new TestWordSpout(), 12).setCPULoad(30.0).setMemoryLoad(100.0, 0.0); + StormTopology stormTopology4 = builder4.createTopology(); + Config config4 = new Config(); + config4.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap4 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4, 12, 0); + TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0); + + // topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in the cluster + TopologyBuilder builder5 = new TopologyBuilder(); + builder5.setSpout("wordSpout5", new TestWordSpout(), 40).setCPULoad(25.0).setMemoryLoad(100.0, 28.0); + StormTopology stormTopology5 = builder5.createTopology(); + Config config5 = new Config(); + config5.putAll(defaultTopologyConf); + Map<ExecutorDetails, String> executorMap5 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5, 40, 0); + TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0); + + // Test1: Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + topoMap.put(topology2.getId(), topology2); + topoMap.put(topology3.getId(), topology3); + Topologies topologies = new Topologies(topoMap); + rs.prepare(config1); + rs.schedule(topologies, cluster); + + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId())); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId())); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId())); + + Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies); + Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies); + + final Double EPSILON = 0.0001; + for (SupervisorDetails supervisor : supMap.values()) { + Double cpuAvailable = supervisor.getTotalCPU(); + Double memAvailable = supervisor.getTotalMemory(); + Double cpuUsed = superToCpu.get(supervisor); + Double memUsed = superToMem.get(supervisor); + Assert.assertTrue((Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON)); + } + // end of Test1 + + // Test2: Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled + cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); + topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + topoMap.put(topology2.getId(), topology2); + topoMap.put(topology4.getId(), topology4); + topologies = new Topologies(topoMap); + rs.prepare(config1); + rs.schedule(topologies, cluster); + int numTopologiesAssigned = 0; + if (cluster.getStatusMap().get(topology1.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) { + numTopologiesAssigned++; + } + if (cluster.getStatusMap().get(topology2.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) { + numTopologiesAssigned++; + } + if (cluster.getStatusMap().get(topology4.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) { + numTopologiesAssigned++; + } + Assert.assertEquals(2, numTopologiesAssigned); + //end of Test2 + + //Test3: "Launch topo5 only, both mem and cpu should be exactly used up" + cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); + topoMap = new HashMap<>(); + topoMap.put(topology5.getId(), topology5); + topologies = new Topologies(topoMap); + rs.prepare(config1); + rs.schedule(topologies, cluster); + superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies); + superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies); + for (SupervisorDetails supervisor : supMap.values()) { + Double cpuAvailable = supervisor.getTotalCPU(); + Double memAvailable = supervisor.getTotalMemory(); + Double cpuUsed = superToCpu.get(supervisor); + Double memUsed = superToMem.get(supervisor); + Assert.assertEquals(cpuAvailable, cpuUsed, 0.0001); + Assert.assertEquals(memAvailable, memUsed, 0.0001); + } + //end of Test3 + } + + @Test + public void testTopologyWorkerMaxHeapSize() { + // Test1: If RAS spreads executors across multiple workers based on the set limit for a worker used by the topology + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap); + + TopologyBuilder builder1 = new TopologyBuilder(); + builder1.setSpout("wordSpout1", new TestWordSpout(), 4); + StormTopology stormTopology1 = builder1.createTopology(); + Config config1 = new Config(); + config1.putAll(defaultTopologyConf); + config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); + Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 4, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + Map<String, TopologyDetails> topoMap = new HashMap<>(); + topoMap.put(topology1.getId(), topology1); + Topologies topologies = new Topologies(topoMap); + rs.prepare(config1); + rs.schedule(topologies, cluster); + Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId())); + Assert.assertEquals(4, cluster.getAssignedNumWorkers(topology1)); + + // Test2: test when no more workers are available due to topology worker max heap size limit but there is memory is still available + // wordSpout2 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB + // The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0 + // Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded + TopologyBuilder builder2 = new TopologyBuilder(); + builder2.setSpout("wordSpout2", new TestWordSpout(), 5); + StormTopology stormTopology2 = builder2.createTopology(); + Config config2 = new Config(); + config2.putAll(defaultTopologyConf); + config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); + Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 5, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0); + cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config2); + topoMap = new HashMap<>(); + topoMap.put(topology2.getId(), topology2); + topologies = new Topologies(topoMap); + rs.prepare(config2); + rs.schedule(topologies, cluster); + Assert.assertEquals("Not enough resources to schedule - 0/5 executors scheduled", cluster.getStatusMap().get(topology2.getId())); + Assert.assertEquals(5, cluster.getUnassignedExecutors(topology2).size()); + } + + @Test(expected=IllegalArgumentException.class) + public void testMemoryLoadLargerThanMaxHeapSize() throws Exception { + // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0, + // Largest memory requirement of a component in the topology). + TopologyBuilder builder1 = new TopologyBuilder(); + builder1.setSpout("wordSpout1", new TestWordSpout(), 4); + StormTopology stormTopology1 = builder1.createTopology(); + Config config1 = new Config(); + config1.putAll(defaultTopologyConf); + config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); + config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 129.0); + StormSubmitter.submitTopologyWithProgressBar("test", config1, stormTopology1); + } + @Test public void TestReadInResourceAwareSchedulerUserPools() { Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false); http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index f21645b..7cd21ce 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -29,6 +29,8 @@ import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.Cluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -50,6 +52,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.Random; import java.util.Set; @@ -109,7 +112,7 @@ public class TestUtilsForResourceAwareScheduler { public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) { Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>(); int startTask = 0; - int endTask = 1; + int endTask = 0; for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) { for (int i = 0; i < spoutParallelism; i++) { retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey()); @@ -285,4 +288,72 @@ public class TestUtilsForResourceAwareScheduler { } return ret; } + + public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) { + Map<SupervisorDetails, Double> superToMem = new HashMap<>(); + Collection<SchedulerAssignment> assignments = cluster.getAssignments().values(); + Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); + for (SupervisorDetails supervisor : supervisors) { + superToMem.put(supervisor, 0.0); + } + + for (SchedulerAssignment assignment : assignments) { + Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>(); + Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>(); + TopologyDetails topology = topologies.getById(assignment.getTopologyId()); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) { + executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId())); + } + for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) { + List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue()); + if (executorsOnSupervisor == null) { + executorsOnSupervisor = new ArrayList<>(); + supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor); + } + executorsOnSupervisor.add(entry.getKey()); + } + for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { + Double supervisorUsedMemory = 0.0; + for (ExecutorDetails executor: entry.getValue()) { + supervisorUsedMemory += topology.getTotalMemReqTask(executor); + } + superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory); + } + } + return superToMem; + } + + public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) { + Map<SupervisorDetails, Double> superToCpu = new HashMap<>(); + Collection<SchedulerAssignment> assignments = cluster.getAssignments().values(); + Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); + for (SupervisorDetails supervisor : supervisors) { + superToCpu.put(supervisor, 0.0); + } + + for (SchedulerAssignment assignment : assignments) { + Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>(); + Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>(); + TopologyDetails topology = topologies.getById(assignment.getTopologyId()); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) { + executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId())); + } + for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) { + List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue()); + if (executorsOnSupervisor == null) { + executorsOnSupervisor = new ArrayList<>(); + supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor); + } + executorsOnSupervisor.add(entry.getKey()); + } + for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { + Double supervisorUsedCpu = 0.0; + for (ExecutorDetails executor: entry.getValue()) { + supervisorUsedCpu += topology.getTotalCpuReqTask(executor); + } + superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu); + } + } + return superToCpu; + } }
