[
https://issues.apache.org/jira/browse/STORM-1300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212414#comment-15212414
]
ASF GitHub Bot commented on STORM-1300:
---------------------------------------
Github user jerrypeng commented on a diff in the pull request:
https://github.com/apache/storm/pull/1232#discussion_r57491392
--- Diff:
storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
---
@@ -54,6 +61,681 @@
private static int currentTime = 1450418597;
+ private static final Config defaultTopologyConf = new Config();
+
+ @BeforeClass
+ public static void initConf() {
+ defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN,
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY,
org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY,
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+ defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
128.0);
+
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
0.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,
8192.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
+ defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+ }
+
+ @Test
+ public void testRASNodeSlotAssign() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+ Topologies topologies = new Topologies(new HashMap<String,
TopologyDetails>());
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), new HashMap());
+ Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster,
topologies);
+ Assert.assertEquals(5, nodes.size());
+ RAS_Node node = nodes.get("sup-0");
+
+ Assert.assertEquals("sup-0", node.getId());
+ Assert.assertTrue(node.isAlive());
+ Assert.assertEquals(0, node.getRunningTopologies().size());
+ Assert.assertTrue(node.isTotallyFree());
+ Assert.assertEquals(4, node.totalSlotsFree());
+ Assert.assertEquals(0, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ TopologyDetails topology1 =
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1,
0, 2, 0, 0, 0);
+
+ List<ExecutorDetails> executors11 = new ArrayList<>();
+ executors11.add(new ExecutorDetails(1, 1));
+ node.assign(node.getFreeSlots().iterator().next(), topology1,
executors11);
+ Assert.assertEquals(1, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(3, node.totalSlotsFree());
+ Assert.assertEquals(1, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ List<ExecutorDetails> executors12 = new ArrayList<>();
+ executors12.add(new ExecutorDetails(2, 2));
+ node.assign(node.getFreeSlots().iterator().next(), topology1,
executors12);
+ Assert.assertEquals(1, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(2, node.totalSlotsFree());
+ Assert.assertEquals(2, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ TopologyDetails topology2 =
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1,
0, 2, 0, 0, 0);
+
+ List<ExecutorDetails> executors21 = new ArrayList<>();
+ executors21.add(new ExecutorDetails(1, 1));
+ node.assign(node.getFreeSlots().iterator().next(), topology2,
executors21);
+ Assert.assertEquals(2, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(1, node.totalSlotsFree());
+ Assert.assertEquals(3, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ List<ExecutorDetails> executors22 = new ArrayList<>();
+ executors22.add(new ExecutorDetails(2, 2));
+ node.assign(node.getFreeSlots().iterator().next(), topology2,
executors22);
+ Assert.assertEquals(2, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(0, node.totalSlotsFree());
+ Assert.assertEquals(4, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ node.freeAllSlots();
+ Assert.assertEquals(0, node.getRunningTopologies().size());
+ Assert.assertTrue(node.isTotallyFree());
+ Assert.assertEquals(4, node.totalSlotsFree());
+ Assert.assertEquals(0, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+ }
+
+ @Test
+ public void sanityTestOfScheduling() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ TopologyDetails topology1 =
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1,
0, 0);
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment =
cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots = assignment.getSlots();
+ Set<String> nodesIDs = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots) {
+ nodesIDs.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors = assignment.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots.size());
+ Assert.assertEquals(1, nodesIDs.size());
+ Assert.assertEquals(2, executors.size());
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testTopologyWithMultipleSpouts() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology
with multiple spouts
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
+ builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
+ builder1.setBolt("wordCountBolt1", new TestWordCounter(),
1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
+ builder1.setBolt("wordCountBolt2", new TestWordCounter(),
1).shuffleGrouping("wordCountBolt1");
+ builder1.setBolt("wordCountBolt3", new TestWordCounter(),
1).shuffleGrouping("wordCountBolt1");
+ builder1.setBolt("wordCountBolt4", new TestWordCounter(),
1).shuffleGrouping("wordCountBolt2");
+ builder1.setBolt("wordCountBolt5", new TestWordCounter(),
1).shuffleGrouping("wordSpout2");
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1",
config, stormTopology1, 0, executorMap1, 0);
+
+ TopologyBuilder builder2 = new TopologyBuilder(); // a topology
with two unconnected partitions
+ builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
+ builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Map<ExecutorDetails, String> executorMap2 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2",
config, stormTopology2, 0, executorMap2, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topoMap.put(topology2.getId(), topology2);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 =
cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors1 =
assignment1.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots1.size());
+ Assert.assertEquals(1, nodesIDs1.size());
+ Assert.assertEquals(7, executors1.size());
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+
+ SchedulerAssignment assignment2 =
cluster.getAssignmentById(topology2.getId());
+ Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
+ Set<String> nodesIDs2 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots2) {
+ nodesIDs2.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors2 =
assignment2.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots2.size());
+ Assert.assertEquals(1, nodesIDs2.size());
+ Assert.assertEquals(2, executors2.size());
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+ }
+
+ @Test
+ public void testTopologySetCpuAndMemLoad() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology
with multiple spouts
+ builder1.setSpout("wordSpout", new TestWordSpout(),
1).setCPULoad(20.0).setMemoryLoad(200.0);
+ builder1.setBolt("wordCountBolt", new TestWordCounter(),
1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1",
config, stormTopology1, 0, executorMap1, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 =
cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ double assignedMemory = 0.0;
+ double assignedCpu = 0.0;
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ assignedMemory += slot.getAllocatedMemOnHeap() +
slot.getAllocatedMemOffHeap();
+ assignedCpu += slot.getAllocatedCpu();
+
+ }
+ Collection<ExecutorDetails> executors1 =
assignment1.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots1.size());
+ Assert.assertEquals(1, nodesIDs1.size());
+ Assert.assertEquals(2, executors1.size());
+ Assert.assertEquals(400.0, assignedMemory, 0.001);
+ Assert.assertEquals(40.0, assignedCpu, 0.001);
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testResourceLimitation() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology
with multiple spouts
+ builder1.setSpout("wordSpout", new TestWordSpout(),
2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
+ builder1.setBolt("wordCountBolt", new TestWordCounter(),
1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1",
config, stormTopology1, 2, executorMap1, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 =
cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors1 =
assignment1.getExecutors();
+ List<Double> assignedExecutorMemory = new ArrayList<>();
+ List<Double> assignedExecutorCpu = new ArrayList<>();
+ for (ExecutorDetails executor : executors1) {
+
assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor));
+
assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor));
+ }
+ Collections.sort(assignedExecutorCpu);
+ Collections.sort(assignedExecutorMemory);
+
+ Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new
HashMap<>();
+ Map<SupervisorDetails, List<ExecutorDetails>>
supervisorToExecutors = new HashMap<>();
+ Map<Double, Double> cpuAvailableToUsed = new HashMap();
+ Map<Double, Double> memoryAvailableToUsed = new HashMap();
+
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
assignment1.getExecutorToSlot().entrySet()) {
+ executorToSupervisor.put(entry.getKey(),
cluster.getSupervisorById(entry.getValue().getNodeId()));
+ }
+ for (Map.Entry<ExecutorDetails, SupervisorDetails> entry :
executorToSupervisor.entrySet()) {
+ List<ExecutorDetails> executorsOnSupervisor =
supervisorToExecutors.get(entry.getValue());
+ if (executorsOnSupervisor == null) {
+ executorsOnSupervisor = new ArrayList<>();
+ supervisorToExecutors.put(entry.getValue(),
executorsOnSupervisor);
+ }
+ executorsOnSupervisor.add(entry.getKey());
+ }
+ for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry :
supervisorToExecutors.entrySet()) {
+ Double supervisorTotalCpu = entry.getKey().getTotalCPU();
+ Double supervisorTotalMemory = entry.getKey().getTotalMemory();
+ Double supervisorUsedCpu = 0.0;
+ Double supervisorUsedMemory = 0.0;
+ for (ExecutorDetails executor: entry.getValue()) {
+ supervisorUsedMemory +=
topology1.getTotalCpuReqTask(executor);
+ supervisorTotalCpu +=
topology1.getTotalMemReqTask(executor);
+ }
+ cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu);
+ memoryAvailableToUsed.put(supervisorTotalMemory,
supervisorUsedMemory);
+ }
+ // executor0 resides one one worker (on one), executor1 and
executor2 on another worker (on the other node)
+ Assert.assertEquals(2, assignedSlots1.size());
+ Assert.assertEquals(2, nodesIDs1.size());
+ Assert.assertEquals(3, executors1.size());
+
+ Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001);
+ Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001);
+ Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001);
+ Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001);
+ Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001);
+ Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001);
+
+ for (Map.Entry<Double, Double> entry :
memoryAvailableToUsed.entrySet()) {
+ Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+ }
+ for (Map.Entry<Double, Double> entry :
cpuAvailableToUsed.entrySet()) {
+ Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+ }
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testScheduleResilience() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder();
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 3);
+ StormTopology stormTopology1 = builder1.createTopology();
+ Config config1 = new Config();
+ config1.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0);
+ TopologyDetails topology1 = new TopologyDetails("topology1",
config1, stormTopology1, 3, executorMap1, 0);
+
+ TopologyBuilder builder2 = new TopologyBuilder();
+ builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Config config2 = new Config();
+ config2.putAll(defaultTopologyConf);
+ // memory requirement is large enough so that two executors can
not be fully assigned to one node
+ config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
1280.0);
+ Map<ExecutorDetails, String> executorMap2 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2",
config2, stormTopology2, 2, executorMap2, 0);
+
+ // Test1: When a worker fails, RAS does not alter existing
assignments on healthy workers
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config1);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology2.getId(), topology2);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config1);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignmentImpl assignment =
(SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId());
+ // pick a worker to mock as failed
+ WorkerSlot failedWorker = new
ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
+ Map<ExecutorDetails, WorkerSlot> executorToSlot =
assignment.getExecutorToSlot();
+ List<ExecutorDetails> failedExecutors = new ArrayList<>();
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
executorToSlot.entrySet()) {
--- End diff --
There is a method in SchedulerAssignmentImpl call getSlotToExecutors that
you can use
> port backtype.storm.scheduler.resource-aware-scheduler-test to java
> --------------------------------------------------------------------
>
> Key: STORM-1300
> URL: https://issues.apache.org/jira/browse/STORM-1300
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Zhuo Liu
> Labels: java-migration, jstorm-merger
>
> Test RAS
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)