Ethanlm commented on a change in pull request #3096: STORM-3480 Implement One 
Worker Per Executor RAS Option
URL: https://github.com/apache/storm/pull/3096#discussion_r311742217
 
 

 ##########
 File path: 
storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 ##########
 @@ -93,31 +104,119 @@ public TestDNSToSwitchMapping(Map<String, 
SupervisorDetails> ... racks) {
         }
     };
 
-    @Rule
-    public NormalizedResourcesRule nrRule = new NormalizedResourcesRule();
-
-    @After
+    @AfterEach
     public void cleanup() {
         if (scheduler != null) {
             scheduler.cleanup();
             scheduler = null;
         }
     }
 
+    /*
+     * test assigned memory with shared memory types and oneWorkerPerExecutor
+     */
+    @ParameterizedTest
+    @EnumSource(TestDefaultResourceAwareStrategy.sharedMemoryTypes.class)
+    public void 
testMultipleSharedMemoryWithOneWorkerPerExecutor(sharedMemoryTypes memoryType) {
+        int spoutParallelism = 4;
+        double cpuPercent = 10;
+        double memoryOnHeap = 10;
+        double memoryOffHeap = 10;
+        double sharedOnHeap = 450;
+        double sharedOffHeapNode = 600;
+        double sharedOffHeapWorker = 400;
+
+        TopologyBuilder builder = new TopologyBuilder();
+        switch (memoryType) {
+            case SHARED_OFF_HEAP_NODE_TYPE:
+                builder.setSpout("spout", new TestSpout(), spoutParallelism)
+                        .addSharedMemory(new 
SharedOffHeapWithinNode(sharedOffHeapNode, "spout shared node"));
+                break;
+            case SHARED_OFF_HEAP_WORKER_TYPE:
+                builder.setSpout("spout", new TestSpout(), spoutParallelism)
+                        .addSharedMemory(new 
SharedOffHeapWithinWorker(sharedOffHeapWorker, "spout shared off heap worker"));
+                break;
+            case SHARED_ON_HEAP_TYPE:
+                builder.setSpout("spout", new TestSpout(), spoutParallelism)
+                        .addSharedMemory(new SharedOnHeap(sharedOnHeap, "spout 
shared worker"));
+                break;
+        }
+        StormTopology stormToplogy = builder.createTopology();
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 
1000);
+        Config conf = createClusterConfig(cpuPercent, memoryOnHeap, 
memoryOffHeap, null);
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+        conf.put(Config.TOPOLOGY_RAS_ONE_WORKER_PER_EXECUTOR, true);
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, 
stormToplogy, 0,
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        Topologies topologies = new Topologies(topo);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(conf);
+        scheduler.schedule(topologies, cluster);
+
+        TopologyResources topologyResources = 
cluster.getTopologyResourcesMap().get(topo.getId());
+        SchedulerAssignment assignment = 
cluster.getAssignmentById(topo.getId());
+        long numNodes = 
assignment.getSlotToExecutors().keySet().stream().map(ws -> 
ws.getNodeId()).distinct().count();
+
+        switch (memoryType) {
+            case SHARED_OFF_HEAP_NODE_TYPE:
+                // 4 workers on single node. OffHeapNode memory is shared
+                assertThat(topologyResources.getAssignedMemOnHeap(), 
closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), 
closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapNode, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOnHeap(), 
closeTo(0, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), 
closeTo(sharedOffHeapNode, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), 
closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), 
closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(numNodes, is(1L));
+                assertThat(cluster.getAssignedNumWorkers(topo), 
is(spoutParallelism));
+                break;
+            case SHARED_OFF_HEAP_WORKER_TYPE:
+                // 4 workers on 2 nodes. OffHeapWorker memory not shared -- 
consumed 4x, once for each worker)
+                assertThat(topologyResources.getAssignedMemOnHeap(), 
closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), 
closeTo(spoutParallelism * (memoryOffHeap + sharedOffHeapWorker), 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOnHeap(), 
closeTo(0, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), 
closeTo(spoutParallelism * sharedOffHeapWorker, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), 
closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), 
closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(numNodes, is(2L));
+                assertThat(cluster.getAssignedNumWorkers(topo), 
is(spoutParallelism));
+                break;
+            case SHARED_ON_HEAP_TYPE:
+                // 4 workers on 2 nodes. onHeap memory not shared -- consumed 
4x, once for each worker
+                assertThat(topologyResources.getAssignedMemOnHeap(), 
closeTo(spoutParallelism * (memoryOnHeap + sharedOnHeap), 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), 
closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOnHeap(), 
closeTo(spoutParallelism * sharedOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), 
closeTo(0, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), 
closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), 
closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(numNodes, is(2L));
+                assertThat(cluster.getAssignedNumWorkers(topo), 
is(spoutParallelism));
+                break;
+        }
+    }
+
     /**
-     * test if the scheduling logic for the DefaultResourceAwareStrategy is 
correct
+     * test if the scheduling shared memory is correct with/without 
oneWorkerPerExecutor enabled
      */
-    @Test
-    public void testDefaultResourceAwareStrategySharedMemory() {
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
 
 Review comment:
   I think we don't need space here. It can just be `{true, false}`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to