[ 
https://issues.apache.org/jira/browse/STORM-1300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212414#comment-15212414
 ] 

ASF GitHub Bot commented on STORM-1300:
---------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57491392
  
    --- Diff: 
storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, 
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
128.0);
    +        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 
8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, 
TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, 
topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, 
executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, 
executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, 
executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, 
executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = 
cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology 
with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 
1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 
1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", 
config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology 
with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", 
config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = 
assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = 
cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = 
assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology 
with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 
1).setCPULoad(20.0).setMemoryLoad(200.0);
    +        builder1.setBolt("wordCountBolt", new TestWordCounter(), 
1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", 
config, stormTopology1, 0, executorMap1, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        double assignedMemory = 0.0;
    +        double assignedCpu = 0.0;
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +            assignedMemory += slot.getAllocatedMemOnHeap() + 
slot.getAllocatedMemOffHeap();
    +            assignedCpu += slot.getAllocatedCpu();
    +
    +        }
    +        Collection<ExecutorDetails> executors1 = 
assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(2, executors1.size());
    +        Assert.assertEquals(400.0, assignedMemory, 0.001);
    +        Assert.assertEquals(40.0, assignedCpu, 0.001);
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testResourceLimitation() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology 
with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 
2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
    +        builder1.setBolt("wordCountBolt", new TestWordCounter(), 
1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", 
config, stormTopology1, 2, executorMap1, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = 
assignment1.getExecutors();
    +        List<Double> assignedExecutorMemory = new ArrayList<>();
    +        List<Double> assignedExecutorCpu = new ArrayList<>();
    +        for (ExecutorDetails executor : executors1) {
    +            
assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor));
    +            
assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor));
    +        }
    +        Collections.sort(assignedExecutorCpu);
    +        Collections.sort(assignedExecutorMemory);
    +
    +        Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new 
HashMap<>();
    +        Map<SupervisorDetails, List<ExecutorDetails>> 
supervisorToExecutors = new HashMap<>();
    +        Map<Double, Double> cpuAvailableToUsed = new HashMap();
    +        Map<Double, Double> memoryAvailableToUsed = new HashMap();
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
assignment1.getExecutorToSlot().entrySet()) {
    +            executorToSupervisor.put(entry.getKey(), 
cluster.getSupervisorById(entry.getValue().getNodeId()));
    +        }
    +        for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : 
executorToSupervisor.entrySet()) {
    +            List<ExecutorDetails> executorsOnSupervisor = 
supervisorToExecutors.get(entry.getValue());
    +            if (executorsOnSupervisor == null) {
    +                executorsOnSupervisor = new ArrayList<>();
    +                supervisorToExecutors.put(entry.getValue(), 
executorsOnSupervisor);
    +            }
    +            executorsOnSupervisor.add(entry.getKey());
    +        }
    +        for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : 
supervisorToExecutors.entrySet()) {
    +            Double supervisorTotalCpu = entry.getKey().getTotalCPU();
    +            Double supervisorTotalMemory = entry.getKey().getTotalMemory();
    +            Double supervisorUsedCpu = 0.0;
    +            Double supervisorUsedMemory = 0.0;
    +            for (ExecutorDetails executor: entry.getValue()) {
    +                supervisorUsedMemory += 
topology1.getTotalCpuReqTask(executor);
    +                supervisorTotalCpu += 
topology1.getTotalMemReqTask(executor);
    +            }
    +            cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu);
    +            memoryAvailableToUsed.put(supervisorTotalMemory, 
supervisorUsedMemory);
    +        }
    +        // executor0 resides one one worker (on one), executor1 and 
executor2 on another worker (on the other node)
    +        Assert.assertEquals(2, assignedSlots1.size());
    +        Assert.assertEquals(2, nodesIDs1.size());
    +        Assert.assertEquals(3, executors1.size());
    +
    +        Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001);
    +        Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001);
    +        Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001);
    +        Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001);
    +        Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001);
    +        Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001);
    +
    +        for (Map.Entry<Double, Double> entry : 
memoryAvailableToUsed.entrySet()) {
    +            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
    +        }
    +        for (Map.Entry<Double, Double> entry : 
cpuAvailableToUsed.entrySet()) {
    +            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
    +        }
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testScheduleResilience() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder();
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 3);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +        Config config1 = new Config();
    +        config1.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", 
config1, stormTopology1, 3, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder();
    +        builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Config config2 = new Config();
    +        config2.putAll(defaultTopologyConf);
    +        // memory requirement is large enough so that two executors can 
not be fully assigned to one node
    +        config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
1280.0);
    +        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", 
config2, stormTopology2, 2, executorMap2, 0);
    +
    +        // Test1: When a worker fails, RAS does not alter existing 
assignments on healthy workers
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config1);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignmentImpl assignment = 
(SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId());
    +        // pick a worker to mock as failed
    +        WorkerSlot failedWorker = new 
ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
    +        Map<ExecutorDetails, WorkerSlot> executorToSlot = 
assignment.getExecutorToSlot();
    +        List<ExecutorDetails> failedExecutors = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
executorToSlot.entrySet()) {
    --- End diff --
    
    There is a method in SchedulerAssignmentImpl call getSlotToExecutors that 
you can use


> port  backtype.storm.scheduler.resource-aware-scheduler-test to java
> --------------------------------------------------------------------
>
>                 Key: STORM-1300
>                 URL: https://issues.apache.org/jira/browse/STORM-1300
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Zhuo Liu
>              Labels: java-migration, jstorm-merger
>
> Test RAS



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to