[ 
https://issues.apache.org/jira/browse/STORM-1300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212335#comment-15212335
 ] 

ASF GitHub Bot commented on STORM-1300:
---------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57485480
  
    --- Diff: 
storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, 
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
128.0);
    +        
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 
8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, 
TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, 
topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, 
executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, 
executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, 
executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, 
executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = 
cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology 
with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 
1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 
1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 
1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", 
config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology 
with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", 
config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = 
cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = 
assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = 
cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = 
assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by 
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology 
with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 
1).setCPULoad(20.0).setMemoryLoad(200.0);
    --- End diff --
    
    In the original clojure code, we did not set the resource requirement for 
the spout.  Is there a reason why we are setting it here


> port  backtype.storm.scheduler.resource-aware-scheduler-test to java
> --------------------------------------------------------------------
>
>                 Key: STORM-1300
>                 URL: https://issues.apache.org/jira/browse/STORM-1300
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Zhuo Liu
>              Labels: java-migration, jstorm-merger
>
> Test RAS



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to