mridulm commented on code in PR #2126:
URL: 
https://github.com/apache/incubator-celeborn/pull/2126#discussion_r1412845908


##########
master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java:
##########
@@ -140,4 +148,310 @@ public void accept(WorkerInfo workerInfo) {
         });
     return workers;
   }
+
+  @Test
+  public void testRackAwareRoundRobinReplicaPatterns() {
+    for (Tuple2<Integer, List<WorkerInfo>> tuple :
+        Arrays.asList(
+            // This is a specific case we analyzed which was resulting in 
degenerate
+            // behavior for replicas
+            Tuple2.apply(
+                100,
+                Arrays.asList(
+                    // h1r1, h2r1, h3r1, h4r2, h5r2, h6r2
+                    WorkersSupplier.initWorker("h1r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h2r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h3r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h4r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h5r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h6r2", "/rack/r2"))),
+            // This is a specific case we observed in production which 
triggered
+            // suboptimal replica selection
+            Tuple2.apply(
+                20,
+                Arrays.asList(
+                    // h1r2, h4r2, h3r1, h1r1, h4r1, h2r1, h3r2, h2r2, h5r2, 
h5r1
+                    WorkersSupplier.initWorker("h1r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h4r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h3r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h1r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h4r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h2r1", "/rack/r1"),
+                    WorkersSupplier.initWorker("h3r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h2r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h5r2", "/rack/r2"),
+                    WorkersSupplier.initWorker("h5r1", "/rack/r1"))))) {
+
+      int numPartitions = tuple._1();
+      List<WorkerInfo> workers = tuple._2();
+      int maxReplicaCount = (numPartitions / workers.size()) + 1;
+      List<Integer> partitionIds =
+          IntStream.range(0, 
numPartitions).boxed().collect(Collectors.toList());
+
+      Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots =
+          SlotsAllocator.offerSlotsRoundRobin(
+              workers, partitionIds, true, true, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+
+      Map<String, Long> numReplicaPerHost =
+          slots.entrySet().stream()
+              .flatMap(v -> 
v.getValue()._2.stream().map(PartitionLocation::getHost))
+              .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+
+      for (String host : 
workers.stream().map(WorkerInfo::host).collect(Collectors.toList())) {
+        long numReplicas = numReplicaPerHost.getOrDefault(host, 0L);
+        Assert.assertTrue(
+            "host = " + host + ", numReplicaPerHost = " + numReplicaPerHost + 
", slots = " + slots,
+            numReplicas <= maxReplicaCount);
+      }
+    }
+  }
+
+  @Test
+  public void testRackAwareRoundRobinReplicaDistribution() {
+
+    List<SlotReplicaAllocatorTestCase> allTests = 
getSlotReplicaAllocatorTestCases();
+
+    for (final SlotReplicaAllocatorTestCase test : allTests) {
+
+      final int numPartitions = test.getNumPartitions();
+      long maxValue = Long.MIN_VALUE;
+      List<WorkerInfo> maxValueWorkers = null;
+      Map<String, Long> maxValueNumReplicaPerHost = null;
+      List<Integer> partitionIds =
+          Collections.unmodifiableList(
+              IntStream.range(0, 
numPartitions).boxed().collect(Collectors.toList()));
+
+      // For each test, run NUM_ATTEMPTS times and pick the worst case.
+      for (int attempt = 0; attempt < NUM_ATTEMPTS; attempt++) {
+
+        // workers will be randomized
+        List<WorkerInfo> workers = test.generateWorkers();
+
+        Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots =
+            SlotsAllocator.offerSlotsRoundRobin(
+                workers, partitionIds, true, true, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+
+        Map<String, Long> numReplicaPerHost =
+            slots.entrySet().stream()
+                .flatMap(v -> 
v.getValue()._2.stream().map(PartitionLocation::getHost))
+                .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+
+        Comparator<Map.Entry<String, Long>> entryComparator =
+            Comparator.comparingLong(Map.Entry::getValue);
+
+        Map.Entry<String, Long> maxEntry =
+            Collections.max(numReplicaPerHost.entrySet(), entryComparator);
+
+        if (maxEntry.getValue() > maxValue) {
+          maxValue = maxEntry.getValue();
+          maxValueWorkers = workers;
+          maxValueNumReplicaPerHost = numReplicaPerHost;
+        }
+      }
+
+      Assert.assertNotNull(maxValueWorkers);
+
+      for (String host :
+          
maxValueWorkers.stream().map(WorkerInfo::host).collect(Collectors.toList())) {
+        long numReplicas = maxValueNumReplicaPerHost.getOrDefault(host, 0L);
+        Assert.assertTrue(
+            "host = "
+                + host
+                + ", workerHosts = "
+                + 
maxValueWorkers.stream().map(WorkerInfo::host).collect(Collectors.toList())
+                + ", replicaHosts = "
+                + 
SlotsAllocator.generateRackAwareWorkers(maxValueWorkers).stream()
+                    .map(WorkerInfo::host)
+                    .collect(Collectors.toList())
+                + ", numReplicaPerHost = "
+                + maxValueNumReplicaPerHost
+                + ", test = "
+                + test,
+            numReplicas <= test.getExpectedMaxSlotsPerHost());
+      }
+    }
+  }
+
+  private static List<SlotReplicaAllocatorTestCase> 
getSlotReplicaAllocatorTestCases() {
+    List<SlotReplicaAllocatorTestCase> equalHostsPerRackTests =
+        // equal number of hosts per rack
+        Arrays.asList(
+            new SlotReplicaAllocatorTestCase(5, 2, 50),
+            new SlotReplicaAllocatorTestCase(5, 2, 100),
+            new SlotReplicaAllocatorTestCase(5, 2, 200),
+            new SlotReplicaAllocatorTestCase(5, 2, 1000),
+            new SlotReplicaAllocatorTestCase(5, 2, 10000),
+            new SlotReplicaAllocatorTestCase(5, 2, 50000),
+            new SlotReplicaAllocatorTestCase(5, 3, 50),
+            new SlotReplicaAllocatorTestCase(5, 3, 100),
+            new SlotReplicaAllocatorTestCase(5, 3, 200),
+            new SlotReplicaAllocatorTestCase(5, 3, 1000),
+            new SlotReplicaAllocatorTestCase(5, 3, 10000),
+            new SlotReplicaAllocatorTestCase(5, 3, 50000),
+            new SlotReplicaAllocatorTestCase(10, 5, 200),
+            new SlotReplicaAllocatorTestCase(10, 5, 1000),
+            new SlotReplicaAllocatorTestCase(10, 5, 10000),
+            new SlotReplicaAllocatorTestCase(10, 5, 50000),
+            new SlotReplicaAllocatorTestCase(20, 10, 1000),
+            new SlotReplicaAllocatorTestCase(20, 10, 10000),
+            new SlotReplicaAllocatorTestCase(20, 10, 50000));
+
+    List<SlotReplicaAllocatorTestCase> unequalHostsPerRackTests =
+        Arrays.asList(
+            // specified number of hosts per rack
+            new SlotReplicaAllocatorTestCase(Arrays.asList(3, 7), 50),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(3, 7), 100),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(3, 7), 200),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(3, 7), 1000),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(3, 7), 10000),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(3, 7), 50000),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(2, 3, 5), 100),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(2, 3, 5), 50000),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(2, 3, 5, 5, 10), 
100),
+            new SlotReplicaAllocatorTestCase(Arrays.asList(2, 3, 5, 5, 10), 
50000));
+
+    List<SlotReplicaAllocatorTestCase> allTests = new ArrayList<>();
+    allTests.addAll(equalHostsPerRackTests);
+    allTests.addAll(unequalHostsPerRackTests);
+    return allTests;
+  }
+
+  private static class WorkersSupplier implements Supplier<List<WorkerInfo>> {
+    private static final Random RAND = new Random(42);
+
+    private final String message;
+    private final List<WorkerInfo> workerList;
+
+    WorkersSupplier(int numHostsPerRack, int numRacks) {
+      this(
+          String.format("Equal hosts(%d) per rack(%d)", numHostsPerRack, 
numRacks),
+          equalHostsPerRackWorkers(numHostsPerRack, numRacks));
+    }
+
+    WorkersSupplier(List<Integer> hostsPerRack) {
+      this("Hosts per rack = " + hostsPerRack, 
specifiedHostsPerRackWorkers(hostsPerRack));
+    }
+
+    WorkersSupplier(String message, List<WorkerInfo> workerList) {
+      this.message = message;
+      this.workerList = Collections.unmodifiableList(workerList);
+    }
+
+    private static List<WorkerInfo> equalHostsPerRackWorkers(int 
numHostsPerRack, int numRacks) {
+      List<WorkerInfo> workers = new ArrayList<>(numRacks * numHostsPerRack);
+      for (int rack = 1; rack <= numRacks; rack++) {
+        final String rackName = "/rack/r" + rack;
+        for (int host = 1; host <= numHostsPerRack; host++) {
+          final String hostName = "h" + host + "r" + rack;
+          workers.add(initWorker(hostName, rackName));
+        }
+      }
+      return workers;
+    }
+
+    private static List<WorkerInfo> specifiedHostsPerRackWorkers(List<Integer> 
hostsPerRack) {
+      List<WorkerInfo> workers = new ArrayList<>();
+
+      int rack = 0;
+      for (int numHosts : hostsPerRack) {
+        rack++;
+        final String rackName = "/rack/r" + rack;
+        for (int host = 1; host <= numHosts; host++) {
+          final String hostName = "h" + host + "r" + rack;
+          workers.add(initWorker(hostName, rackName));
+        }
+      }
+
+      return workers;
+    }
+
+    @Override
+    public List<WorkerInfo> get() {
+      List<WorkerInfo> result = new ArrayList<>(workerList);
+      Collections.shuffle(result, RAND);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "WorkersSupplier{" + message + "}";
+    }
+
+    private static WorkerInfo initWorker(String host, String rack) {
+      long assumedPartitionSize = 64 * 1024 * 1024;
+      DiskInfo diskInfo = new DiskInfo("/mnt/a", 1024L * 1024L * 1024L * 
1024L, 1, 1, 0);
+      diskInfo.maxSlots_$eq(diskInfo.actualUsableSpace() / 
assumedPartitionSize);
+
+      Map<String, DiskInfo> diskInfoMap = new HashMap<>(2);
+      diskInfoMap.put(diskInfo.mountPoint(), diskInfo);
+
+      WorkerInfo workerInfo =
+          new WorkerInfo(host, 1, 2, 3, 4, 
Collections.unmodifiableMap(diskInfoMap), null);
+      workerInfo.networkLocation_$eq(rack);
+      return workerInfo;
+    }
+  }
+
+  private static class SlotReplicaAllocatorTestCase {
+
+    private final Supplier<List<WorkerInfo>> workersSupplier;
+    // Total number of partitions
+    private final int numPartitions;
+
+    // The maximum number of slots per replica we are expecting
+    private final int expectedMaxSlotsPerHost;
+
+    SlotReplicaAllocatorTestCase(int numHostsPerRack, int numRacks, int 
numPartitions) {
+      this(
+          new WorkersSupplier(numHostsPerRack, numRacks),
+          numPartitions,
+          (int) Math.ceil(((double) numPartitions) / (numHostsPerRack * 
numRacks)));
+    }
+
+    SlotReplicaAllocatorTestCase(List<Integer> hostsPerRack, int 
numPartitions) {
+      this(
+          new WorkersSupplier(hostsPerRack),
+          numPartitions,
+          // 1 + (maxHostsPerRack * numPartitions) / (minHostsPerRack * 
totalHosts)

Review Comment:
   Will update, thanks !
   My intent was to keep it as close to the uniform hosts per rack as possible 
as well (so that it makes sense if distribution is equal), while ensuring max 
replicas per node is not a hotspot (in case the distribution algo changes in 
future).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to