[
https://issues.apache.org/jira/browse/STORM-1300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212333#comment-15212333
]
ASF GitHub Bot commented on STORM-1300:
---------------------------------------
Github user jerrypeng commented on a diff in the pull request:
https://github.com/apache/storm/pull/1232#discussion_r57485387
--- Diff:
storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
---
@@ -54,6 +61,681 @@
private static int currentTime = 1450418597;
+ private static final Config defaultTopologyConf = new Config();
+
+ @BeforeClass
+ public static void initConf() {
+ defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN,
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY,
org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+
defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY,
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+ defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
128.0);
+
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
0.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,
8192.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
+ defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+ }
+
+ @Test
+ public void testRASNodeSlotAssign() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+ Topologies topologies = new Topologies(new HashMap<String,
TopologyDetails>());
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), new HashMap());
+ Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster,
topologies);
+ Assert.assertEquals(5, nodes.size());
+ RAS_Node node = nodes.get("sup-0");
+
+ Assert.assertEquals("sup-0", node.getId());
+ Assert.assertTrue(node.isAlive());
+ Assert.assertEquals(0, node.getRunningTopologies().size());
+ Assert.assertTrue(node.isTotallyFree());
+ Assert.assertEquals(4, node.totalSlotsFree());
+ Assert.assertEquals(0, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ TopologyDetails topology1 =
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1,
0, 2, 0, 0, 0);
+
+ List<ExecutorDetails> executors11 = new ArrayList<>();
+ executors11.add(new ExecutorDetails(1, 1));
+ node.assign(node.getFreeSlots().iterator().next(), topology1,
executors11);
+ Assert.assertEquals(1, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(3, node.totalSlotsFree());
+ Assert.assertEquals(1, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ List<ExecutorDetails> executors12 = new ArrayList<>();
+ executors12.add(new ExecutorDetails(2, 2));
+ node.assign(node.getFreeSlots().iterator().next(), topology1,
executors12);
+ Assert.assertEquals(1, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(2, node.totalSlotsFree());
+ Assert.assertEquals(2, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ TopologyDetails topology2 =
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1,
0, 2, 0, 0, 0);
+
+ List<ExecutorDetails> executors21 = new ArrayList<>();
+ executors21.add(new ExecutorDetails(1, 1));
+ node.assign(node.getFreeSlots().iterator().next(), topology2,
executors21);
+ Assert.assertEquals(2, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(1, node.totalSlotsFree());
+ Assert.assertEquals(3, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ List<ExecutorDetails> executors22 = new ArrayList<>();
+ executors22.add(new ExecutorDetails(2, 2));
+ node.assign(node.getFreeSlots().iterator().next(), topology2,
executors22);
+ Assert.assertEquals(2, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(0, node.totalSlotsFree());
+ Assert.assertEquals(4, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ node.freeAllSlots();
+ Assert.assertEquals(0, node.getRunningTopologies().size());
+ Assert.assertTrue(node.isTotallyFree());
+ Assert.assertEquals(4, node.totalSlotsFree());
+ Assert.assertEquals(0, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+ }
+
+ @Test
+ public void sanityTestOfScheduling() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ TopologyDetails topology1 =
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1,
0, 0);
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment =
cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots = assignment.getSlots();
+ Set<String> nodesIDs = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots) {
+ nodesIDs.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors = assignment.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots.size());
+ Assert.assertEquals(1, nodesIDs.size());
+ Assert.assertEquals(2, executors.size());
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testTopologyWithMultipleSpouts() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology
with multiple spouts
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
+ builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
+ builder1.setBolt("wordCountBolt1", new TestWordCounter(),
1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
+ builder1.setBolt("wordCountBolt2", new TestWordCounter(),
1).shuffleGrouping("wordCountBolt1");
+ builder1.setBolt("wordCountBolt3", new TestWordCounter(),
1).shuffleGrouping("wordCountBolt1");
+ builder1.setBolt("wordCountBolt4", new TestWordCounter(),
1).shuffleGrouping("wordCountBolt2");
+ builder1.setBolt("wordCountBolt5", new TestWordCounter(),
1).shuffleGrouping("wordSpout2");
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1",
config, stormTopology1, 0, executorMap1, 0);
+
+ TopologyBuilder builder2 = new TopologyBuilder(); // a topology
with two unconnected partitions
+ builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
+ builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Map<ExecutorDetails, String> executorMap2 =
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2",
config, stormTopology2, 0, executorMap2, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String,
SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topoMap.put(topology2.getId(), topology2);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 =
cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors1 =
assignment1.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots1.size());
+ Assert.assertEquals(1, nodesIDs1.size());
+ Assert.assertEquals(7, executors1.size());
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+
+ SchedulerAssignment assignment2 =
cluster.getAssignmentById(topology2.getId());
+ Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
+ Set<String> nodesIDs2 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots2) {
+ nodesIDs2.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors2 =
assignment2.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots2.size());
+ Assert.assertEquals(1, nodesIDs2.size());
+ Assert.assertEquals(2, executors2.size());
+ Assert.assertEquals("Running - Fully Scheduled by
DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+ }
+
+ @Test
+ public void testTopologySetCpuAndMemLoad() {
+ INimbus iNimbus = new
TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap =
TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology
with multiple spouts
+ builder1.setSpout("wordSpout", new TestWordSpout(),
1).setCPULoad(20.0).setMemoryLoad(200.0);
+ builder1.setBolt("wordCountBolt", new TestWordCounter(),
1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
--- End diff --
In the original clojure code with set the the memory load to be 110.0. Is
there a reason why we changed this?
> port backtype.storm.scheduler.resource-aware-scheduler-test to java
> --------------------------------------------------------------------
>
> Key: STORM-1300
> URL: https://issues.apache.org/jira/browse/STORM-1300
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Zhuo Liu
> Labels: java-migration, jstorm-merger
>
> Test RAS
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)