bipinprasad commented on a change in pull request #3378:
URL: https://github.com/apache/storm/pull/3378#discussion_r580274718



##########
File path: 
storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
##########
@@ -312,57 +321,93 @@ private static void createAndAddOneSupervisor(
     }
 
     /**
-     * Create supervisors.
+     * Create supervisors for a larger cluster configuration.
      *
-     * @param uniformSupervisors true if all supervisors are of the same size, 
false otherwise.
-     * @return supervisor details indexed by id
+     * @param reducedSupervisorsPerRack number of supervisors to reduce in 
rack.
+     * @return created supervisors.
      */
-    private static Map<String, SupervisorDetails> createSupervisors(boolean 
uniformSupervisors) {
-        Map<String, SupervisorDetails> retVal;
-        if (uniformSupervisors) {
-            int numRacks = 16;
-            int numSupersPerRack = 82;
-            int numPorts = 50; // note: scheduling is slower when components 
with large cpu/mem leave large percent of workerslots unused
-            int rackStart = 0;
-            int superInRackStart = 1;
-            double cpu = 7200; // %percent
-            double mem = 356_000; // MB
-            Map<String, Double> miscResources = new HashMap<>();
-            miscResources.put("network.resource.units", 100.0);
-
-            return TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(
-                    numRacks, numSupersPerRack, numPorts, rackStart, 
superInRackStart, cpu, mem, miscResources);
-
+    private static Map<String, SupervisorDetails> createSupervisors(int 
reducedSupervisorsPerRack) {
+        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
+            Collection<SupervisorDistribution> supervisorDistributions = 
SupervisorDistribution.getSupervisorDistribution02();
+            return createSupervisors(supervisorDistributions, 
reducedSupervisorsPerRack);
+        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
+            Collection<SupervisorDistribution> supervisorDistributions = 
SupervisorDistribution.getSupervisorDistribution03();
+            return createSupervisors(supervisorDistributions, 
reducedSupervisorsPerRack);
         } else {
-            // this non-uniform supervisor distribution closely (but not 
exactly) mimics a large cluster in use
-            int numSupersPerRack = 82;
-            int numPorts = 50;
-
-            Map<String, SupervisorDetails> retList = new HashMap<>();
+            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
+        }
+    }
 
-            for (int rack = 0 ; rack < 12 ; rack++) {
-                double cpu = 3600; // %percent
-                double mem = 178_000; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; 
superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * 
(superInRack % 2), mem, numPorts, retList);
-                }
+    /**
+     * Create supervisors based on a predefined supervisor distribution 
modeled after an existing
+     * large cluster in use.
+     *
+     * @param supervisorDistributions supervisor distribution to use.
+     * @param reducedSupervisorsPerRack number of supervisors to reduce per 
rack.
+     * @return created supervisors.
+     */
+    private static Map<String, SupervisorDetails> createSupervisors(
+        Collection<SupervisorDistribution> supervisorDistributions, int 
reducedSupervisorsPerRack) {
+        Map<String, Collection<SupervisorDistribution>> byRackId = 
SupervisorDistribution.mapByRackId(supervisorDistributions);
+        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, 
SupervisorDistribution.clusterCapacity(supervisorDistributions));
+
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        Map<String, AtomicInteger> seenRacks = new HashMap<>();
+        byRackId.forEach((rackId, list) -> {
+            int tmpRackSupervisorCnt = list.stream().mapToInt(x -> 
x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack);
+            if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) {
+                tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack);
             }
-            for (int rack = 12 ; rack < 14 ; rack++) {
-                double cpu = 2400; // %percent
-                double mem = 118_100; // MB
-                for (int superInRack = 0; superInRack < numSupersPerRack ; 
superInRack++) {
-                    createAndAddOneSupervisor(rack, superInRack, cpu - 100 * 
(superInRack % 2), mem, numPorts, retList);
+            final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
+            list.forEach(x -> {
+                int supervisorCnt = x.supervisorCnt;
+                for (int i = 0; i < supervisorCnt ; i++) {
+                    int superInRack = seenRacks.computeIfAbsent(rackId, z -> 
new AtomicInteger(-1)).incrementAndGet();
+                    int rackNum = seenRacks.size() - 1;
+                    if (superInRack >= adjustedRackSupervisorCnt) {
+                        continue;
+                    }
+                    createAndAddOneSupervisor(rackNum, superInRack, 
x.cpuPercent, x.memoryMb, x.slotCnt, retList);

Review comment:
       The supervisor distribution can have repeated entries for the same rack 
id (string). Supervisor id is generated from racknum (int) and has to be in a 
fixed predefined pattern that can be reliably parsed to generate host -> rack 
mapping (used in Cluster DNS mapping).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to