somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085023573


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -105,568 +105,672 @@ public void setUp()
   @Test
   public void testRebalance()
       throws Exception {
-    int numServers = 3;
-    // Mock disk usage
-    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
-
-    for (int i = 0; i < numServers; i++) {
-      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
-      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-      DiskUsageInfo diskUsageInfo1 =
-          new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
-      diskUsageInfoMap.put(instanceId, diskUsageInfo1);
-    }
-
-    ExecutorService executorService = Executors.newFixedThreadPool(10);
-    DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
-    preChecker.init(_helixResourceManager, executorService, 1);
-    TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
-
-    // Rebalance should fail without creating the table
-    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig(), null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
-    assertNull(rebalanceResult.getRebalanceSummaryResult());
-
-    // Rebalance with dry-run summary should fail without creating the table
-    RebalanceConfig rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
-    assertNull(rebalanceResult.getRebalanceSummaryResult());
-
-    // Create the table
-    addDummySchema(RAW_TABLE_NAME);
-    _helixResourceManager.addTable(tableConfig);
-
-    // Add the segments
-    int numSegments = 10;
-    for (int i = 0; i < numSegments; i++) {
-      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
-    }
-    Map<String, Map<String, String>> oldSegmentAssignment =
-        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
-
-    // Rebalance with dry-run summary should return NO_OP status
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
-    
assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary());
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
-    assertNotNull(rebalanceSummaryResult.getTagsInfo());
-    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-        TagNameUtils.getOfflineTagForTenant(null));
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
-
-    // Dry-run mode should not change the IdealState
-    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
-        oldSegmentAssignment);
-
-    // Rebalance should return NO_OP status
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-
-    // All servers should be assigned to the table
-    Map<InstancePartitionsType, InstancePartitions> instanceAssignment = 
rebalanceResult.getInstanceAssignment();
-    assertEquals(instanceAssignment.size(), 1);
-    InstancePartitions instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
-    assertEquals(instancePartitions.getNumReplicaGroups(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2
-    assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 0, SERVER_INSTANCE_ID_PREFIX + 1));
-
-    // Segment assignment should not change
-    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
-
-    // Add 3 more servers
-    int numServersToAdd = 3;
-    for (int i = 0; i < numServersToAdd; i++) {
-      String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
-      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-      DiskUsageInfo diskUsageInfo =
-          new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
-      diskUsageInfoMap.put(instanceId, diskUsageInfo);
-    }
-
-    ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
-
-    // Rebalance in dry-run summary mode with added servers
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 14);
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 14);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
-    assertNotNull(rebalanceSummaryResult.getTagsInfo());
-    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-        TagNameUtils.getOfflineTagForTenant(null));
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 14);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS - 14);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers + numServersToAdd);
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
-
-    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap =
-        rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
-    assertNotNull(serverSegmentChangeInfoMap);
-    for (int i = 0; i < numServers; i++) {
-      // Original servers should be losing some segments
-      String newServer = SERVER_INSTANCE_ID_PREFIX + i;
-      RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
-      assertTrue(serverSegmentChange.getSegmentsDeleted() > 0);
-      assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
-      assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
-      assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0);
-      assertEquals(serverSegmentChange.getSegmentsAdded(), 0);
-    }
-    for (int i = 0; i < numServersToAdd; i++) {
-      // New servers should only get new segments
-      String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
-      RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
-      assertTrue(serverSegmentChange.getSegmentsAdded() > 0);
-      assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0);
-      assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 
serverSegmentChange.getSegmentsAdded());
-      assertEquals(serverSegmentChange.getSegmentsDeleted(), 0);
-      assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0);
-    }
-
-    // Dry-run mode should not change the IdealState
-    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
-        oldSegmentAssignment);
-
-    // Rebalance in dry-run mode
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceConfig.setPreChecks(true);
-    rebalanceConfig.setReassignInstances(true);
-
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    Map<String, RebalancePreCheckerResult> preCheckResult = 
rebalanceResult.getPreChecksResult();
-    assertNotNull(preCheckResult);
-    assertEquals(preCheckResult.size(), 6);
-    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
-    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
-    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE));
-    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE));
-    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS));
-    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO));
-    // Sending request to servers should fail for all, so needsPreprocess 
should be set to "error" to indicate that a
-    // manual check is needed
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.ERROR);
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getMessage(),
-        "Could not determine needReload status, run needReload API manually");
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(),
-        "Instance assignment not allowed, no need for minimizeDataMovement");
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    assertTrue(
-        
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
-            .getMessage()
-            .startsWith("Within threshold"));
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    assertTrue(
-        
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
-            .getMessage()
-            .startsWith("Within threshold"));
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(),
-        "All rebalance parameters look good");
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
-        "OFFLINE segments - Replica Groups are not enabled, replication: " + 
NUM_REPLICAS);
-
-    // All servers should be assigned to the table
-    instanceAssignment = rebalanceResult.getInstanceAssignment();
-    assertEquals(instanceAssignment.size(), 1);
-    instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
-    assertEquals(instancePartitions.getNumReplicaGroups(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
-    assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 4,
-            SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, 
SERVER_INSTANCE_ID_PREFIX + 1));
-
-    // Segments should be moved to the new added servers
-    Map<String, Map<String, String>> newSegmentAssignment = 
rebalanceResult.getSegmentAssignment();
-    Map<String, IntIntPair> instanceToNumSegmentsToMoveMap =
-        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, 
newSegmentAssignment);
-    assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + 
numServersToAdd);
-    for (int i = 0; i < numServersToAdd; i++) {
-      IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + 
i));
-      assertNotNull(numSegmentsToMove);
-      assertTrue(numSegmentsToMove.leftInt() > 0);
-      assertEquals(numSegmentsToMove.rightInt(), 0);
-    }
-    for (int i = 0; i < numServers; i++) {
-      IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i);
-      assertNotNull(numSegmentsToMove);
-      assertEquals(numSegmentsToMove.leftInt(), 0);
-      assertTrue(numSegmentsToMove.rightInt() > 0);
-    }
-
-    // Dry-run mode should not change the IdealState
-    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
-        oldSegmentAssignment);
-
-    // Rebalance dry-run summary with 3 min available replicas should not be 
impacted since actual rebalance does not
-    // occur
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceConfig.setPreChecks(true);
-    rebalanceConfig.setMinAvailableReplicas(3);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
-    assertNotNull(rebalanceResult.getPreChecksResult());
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
-
-    // Rebalance with 3 min available replicas should fail as the table only 
have 3 replicas
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setMinAvailableReplicas(3);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
-
-    // IdealState should not change for FAILED rebalance
-    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
-        oldSegmentAssignment);
-
-    // Rebalance with 2 min available replicas should succeed
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setMinAvailableReplicas(2);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-
-    // Result should be the same as the result in dry-run mode
-    instanceAssignment = rebalanceResult.getInstanceAssignment();
-    assertEquals(instanceAssignment.size(), 1);
-    
assertEquals(instanceAssignment.get(InstancePartitionsType.OFFLINE).getPartitionToInstancesMap(),
-        instancePartitions.getPartitionToInstancesMap());
-    assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
-
-    // Update the table config to use replica-group based assignment
-    InstanceTagPoolConfig tagPoolConfig =
-        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null);
-    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
-        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 
0, false, null);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
-    _helixResourceManager.updateTableConfig(tableConfig);
-
-    // Try dry-run summary mode
-    // No need to reassign instances because instances should be automatically 
assigned when updating the table config
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceConfig.setPreChecks(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    // Though instance partition map is set in ZK, the pre-checker is unaware 
of that, a warning will be thrown
-    assertEquals(
-        
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.WARN);
-    
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
-        "reassignInstances is disabled, replica groups may not be 
updated.\nOFFLINE segments - numReplicaGroups: "
-            + NUM_REPLICAS + ", numInstancesPerReplicaGroup: 0 (using as many 
instances as possible)");
-    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 11);
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 11);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
-    assertNotNull(rebalanceSummaryResult.getTagsInfo());
-    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-        TagNameUtils.getOfflineTagForTenant(null));
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 11);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS - 11);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers + numServersToAdd);
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
-
-    serverSegmentChangeInfoMap = 
rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
-    assertNotNull(serverSegmentChangeInfoMap);
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      String newServer = SERVER_INSTANCE_ID_PREFIX + i;
-      RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
-      assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
-      // Ensure not all segments moved
-      assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
-      // Ensure all segments has something assigned prior to rebalance
-      assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
-    }
-
-    // Dry-run mode should not change the IdealState
-    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
-        newSegmentAssignment);
-
-    // Try actual rebalance
-    // No need to reassign instances because instances should be automatically 
assigned when updating the table config
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    for (int batchSizePerServer : 
Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 1, 2)) {
+      int numServers = 3;
+      // Mock disk usage
+      Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+
+      for (int i = 0; i < numServers; i++) {
+        String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+        addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+        DiskUsageInfo diskUsageInfo1 =
+            new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
+        diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+      }
 
-    // There should be 3 replica-groups, each with 2 servers
-    instanceAssignment = rebalanceResult.getInstanceAssignment();
-    assertEquals(instanceAssignment.size(), 1);
-    instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
-    assertEquals(instancePartitions.getNumReplicaGroups(), NUM_REPLICAS);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
-    // [i2, i3, i4, i5, i0, i1]
-    //  r0  r1  r2  r0  r1  r2
-    assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5));
-    assertEquals(instancePartitions.getInstances(0, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 0));
-    assertEquals(instancePartitions.getInstances(0, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 1));
+      ExecutorService executorService = Executors.newFixedThreadPool(10);
+      DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+      preChecker.init(_helixResourceManager, executorService, 1);
+      TableRebalancer tableRebalancer =
+          new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
+      TableConfig tableConfig =
+          new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+      // Rebalance should fail without creating the table
+      RebalanceConfig rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+      assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+      // Rebalance with dry-run summary should fail without creating the table
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+      assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+      // Create the table
+      addDummySchema(RAW_TABLE_NAME);
+      _helixResourceManager.addTable(tableConfig);
+
+      // Add the segments
+      int numSegments = 10;
+      for (int i = 0; i < numSegments; i++) {
+        _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+            SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
+      }
+      Map<String, Map<String, String>> oldSegmentAssignment =
+          
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+      // Rebalance with dry-run summary should return NO_OP status
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+      RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
+      
assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary());
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+      assertNotNull(rebalanceSummaryResult.getTagsInfo());
+      assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+          TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      // Dry-run mode should not change the IdealState
+      
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+          oldSegmentAssignment);
+
+      // Rebalance should return NO_OP status
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
+      // All servers should be assigned to the table
+      Map<InstancePartitionsType, InstancePartitions> instanceAssignment = 
rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      InstancePartitions instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
+      assertEquals(instancePartitions.getNumReplicaGroups(), 1);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+      // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2
+      assertEquals(instancePartitions.getInstances(0, 0),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, 
SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1));
+
+      // Segment assignment should not change
+      assertEquals(rebalanceResult.getSegmentAssignment(), 
oldSegmentAssignment);
+
+      // Add 3 more servers
+      int numServersToAdd = 3;
+      for (int i = 0; i < numServersToAdd; i++) {
+        String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+        addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+        DiskUsageInfo diskUsageInfo =
+            new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
+        diskUsageInfoMap.put(instanceId, diskUsageInfo);
+      }
 
-    // The assignment are based on replica-group 0 and mirrored to all the 
replica-groups, so server of index 0, 1, 5
-    // should have the same segments assigned, and server of index 2, 3, 4 
should have the same segments assigned, each
-    // with 5 segments
-    newSegmentAssignment = rebalanceResult.getSegmentAssignment();
-    int numSegmentsOnServer0 = 0;
-    for (int i = 0; i < numSegments; i++) {
-      String segmentName = SEGMENT_NAME_PREFIX + i;
-      Map<String, String> instanceStateMap = 
newSegmentAssignment.get(segmentName);
-      assertEquals(instanceStateMap.size(), NUM_REPLICAS);
-      if (instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + 0)) {
-        numSegmentsOnServer0++;
-        assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 0), 
ONLINE);
-        assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 1), 
ONLINE);
-        assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 5), 
ONLINE);
-      } else {
-        assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 2), 
ONLINE);
-        assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 3), 
ONLINE);
-        assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 4), 
ONLINE);
+      ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+      // Rebalance in dry-run summary mode with added servers
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 14);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 14);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
+      assertNotNull(rebalanceSummaryResult.getTagsInfo());
+      assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+          TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 14);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+          numSegments * NUM_REPLICAS - 14);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+          numServers + numServersToAdd);
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap =
+          rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
+      assertNotNull(serverSegmentChangeInfoMap);
+      for (int i = 0; i < numServers; i++) {
+        // Original servers should be losing some segments
+        String newServer = SERVER_INSTANCE_ID_PREFIX + i;
+        RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+        assertTrue(serverSegmentChange.getSegmentsDeleted() > 0);
+        assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
+        assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
+        assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0);
+        assertEquals(serverSegmentChange.getSegmentsAdded(), 0);
+      }
+      for (int i = 0; i < numServersToAdd; i++) {
+        // New servers should only get new segments
+        String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+        RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+        assertTrue(serverSegmentChange.getSegmentsAdded() > 0);
+        assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0);
+        assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 
serverSegmentChange.getSegmentsAdded());
+        assertEquals(serverSegmentChange.getSegmentsDeleted(), 0);
+        assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0);
       }
-    }
-    assertEquals(numSegmentsOnServer0, numSegments / 2);
 
-    // Update the table config to use non-replica-group based assignment
-    tableConfig.setInstanceAssignmentConfigMap(null);
-    _helixResourceManager.updateTableConfig(tableConfig);
+      // Dry-run mode should not change the IdealState
+      
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+          oldSegmentAssignment);
+
+      // Rebalance in dry-run mode
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceConfig.setPreChecks(true);
+      rebalanceConfig.setReassignInstances(true);
+
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      Map<String, RebalancePreCheckerResult> preCheckResult = 
rebalanceResult.getPreChecksResult();
+      assertNotNull(preCheckResult);
+      assertEquals(preCheckResult.size(), 6);
+      
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+      
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+      
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE));
+      
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE));
+      
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS));
+      
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO));
+      // Sending request to servers should fail for all, so needsPreprocess 
should be set to "error" to indicate that a
+      // manual check is needed
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.ERROR);
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getMessage(),
+          "Could not determine needReload status, run needReload API 
manually");
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(),
+          "Instance assignment not allowed, no need for minimizeDataMovement");
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      assertTrue(
+          
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
+              .getMessage()
+              .startsWith("Within threshold"));
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      assertTrue(
+          
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
+              .getMessage()
+              .startsWith("Within threshold"));
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(),
+          "All rebalance parameters look good");
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+          "OFFLINE segments - Replica Groups are not enabled, replication: " + 
NUM_REPLICAS);
+
+      // All servers should be assigned to the table
+      instanceAssignment = rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
+      assertEquals(instancePartitions.getNumReplicaGroups(), 1);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+      // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+      assertEquals(instancePartitions.getInstances(0, 0),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, 
SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4,
+              SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, 
SERVER_INSTANCE_ID_PREFIX + 1));
+
+      // Segments should be moved to the new added servers
+      Map<String, Map<String, String>> newSegmentAssignment = 
rebalanceResult.getSegmentAssignment();
+      Map<String, IntIntPair> instanceToNumSegmentsToMoveMap =
+          
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, 
newSegmentAssignment);
+      assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + 
numServersToAdd);
+      for (int i = 0; i < numServersToAdd; i++) {
+        IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + 
i));
+        assertNotNull(numSegmentsToMove);
+        assertTrue(numSegmentsToMove.leftInt() > 0);
+        assertEquals(numSegmentsToMove.rightInt(), 0);
+      }
+      for (int i = 0; i < numServers; i++) {
+        IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i);
+        assertNotNull(numSegmentsToMove);
+        assertEquals(numSegmentsToMove.leftInt(), 0);
+        assertTrue(numSegmentsToMove.rightInt() > 0);
+      }
 
-    // Try dry-run summary mode without reassignment to ensure that existing 
instance partitions are used
-    // no movement should occur
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceConfig.setPreChecks(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-    assertEquals(
-        
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
-        "OFFLINE segments - Replica Groups are not enabled, replication: " + 
NUM_REPLICAS);
-    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
-    assertNotNull(rebalanceSummaryResult.getTagsInfo());
-    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-        TagNameUtils.getOfflineTagForTenant(null));
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers + numServersToAdd);
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
+      // Dry-run mode should not change the IdealState
+      
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+          oldSegmentAssignment);
+
+      // Rebalance dry-run summary with 3 min available replicas should not be 
impacted since actual rebalance does not
+      // occur
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceConfig.setPreChecks(true);
+      rebalanceConfig.setMinAvailableReplicas(3);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+      assertNotNull(rebalanceResult.getPreChecksResult());
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      // Rebalance with 3 min available replicas should fail as the table only 
have 3 replicas
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setMinAvailableReplicas(3);
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+
+      // IdealState should not change for FAILED rebalance
+      
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+          oldSegmentAssignment);
+
+      // Rebalance with 2 min available replicas should succeed
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setMinAvailableReplicas(2);
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+      // Result should be the same as the result in dry-run mode
+      instanceAssignment = rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      
assertEquals(instanceAssignment.get(InstancePartitionsType.OFFLINE).getPartitionToInstancesMap(),
+          instancePartitions.getPartitionToInstancesMap());
+      assertEquals(rebalanceResult.getSegmentAssignment(), 
newSegmentAssignment);
+
+      // Update the table config to use replica-group based assignment
+      InstanceTagPoolConfig tagPoolConfig =
+          new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null);
+      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+          new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 
0, false, null);
+      
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+          new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
+      _helixResourceManager.updateTableConfig(tableConfig);
+
+      // Try dry-run summary mode
+      // No need to reassign instances because instances should be 
automatically assigned when updating the table config
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceConfig.setPreChecks(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      // Though instance partition map is set in ZK, the pre-checker is 
unaware of that, a warning will be thrown
+      assertEquals(
+          
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.WARN);
+      
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
+              .getMessage(), "reassignInstances is disabled, replica groups 
may not be updated.\nOFFLINE segments "
+          + "- numReplicaGroups: " + NUM_REPLICAS + ", 
numInstancesPerReplicaGroup: 0 (using as many instances as "
+          + "possible)");
+      rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 11);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 11);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+      assertNotNull(rebalanceSummaryResult.getTagsInfo());
+      assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+          TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 11);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+          numSegments * NUM_REPLICAS - 11);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+          numServers + numServersToAdd);
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      serverSegmentChangeInfoMap = 
rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
+      assertNotNull(serverSegmentChangeInfoMap);
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        String newServer = SERVER_INSTANCE_ID_PREFIX + i;
+        RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+        assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
+        // Ensure not all segments moved
+        assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
+        // Ensure all segments has something assigned prior to rebalance
+        assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
+      }
 
-    // Without instances reassignment, the rebalance should return status 
NO_OP, and the existing instance partitions
-    // should be used
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-    assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment);
-    assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
+      // Dry-run mode should not change the IdealState
+      
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+          newSegmentAssignment);
+
+      // Try actual rebalance
+      // No need to reassign instances because instances should be 
automatically assigned when updating the table config
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+      // There should be 3 replica-groups, each with 2 servers
+      instanceAssignment = rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
+      assertEquals(instancePartitions.getNumReplicaGroups(), NUM_REPLICAS);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+      // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+      // [i2, i3, i4, i5, i0, i1]
+      //  r0  r1  r2  r0  r1  r2
+      assertEquals(instancePartitions.getInstances(0, 0),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, 
SERVER_INSTANCE_ID_PREFIX + 5));
+      assertEquals(instancePartitions.getInstances(0, 1),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, 
SERVER_INSTANCE_ID_PREFIX + 0));
+      assertEquals(instancePartitions.getInstances(0, 2),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, 
SERVER_INSTANCE_ID_PREFIX + 1));
+
+      // The assignment are based on replica-group 0 and mirrored to all the 
replica-groups, so server of index 0, 1, 5
+      // should have the same segments assigned, and server of index 2, 3, 4 
should have the same segments assigned,
+      // each with 5 segments
+      newSegmentAssignment = rebalanceResult.getSegmentAssignment();
+      int numSegmentsOnServer0 = 0;
+      for (int i = 0; i < numSegments; i++) {
+        String segmentName = SEGMENT_NAME_PREFIX + i;
+        Map<String, String> instanceStateMap = 
newSegmentAssignment.get(segmentName);
+        assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+        if (instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + 0)) {
+          numSegmentsOnServer0++;
+          assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 0), 
ONLINE);
+          assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 1), 
ONLINE);
+          assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 5), 
ONLINE);
+        } else {
+          assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 2), 
ONLINE);
+          assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 3), 
ONLINE);
+          assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 4), 
ONLINE);
+        }
+      }
+      assertEquals(numSegmentsOnServer0, numSegments / 2);
+
+      // Update the table config to use non-replica-group based assignment
+      tableConfig.setInstanceAssignmentConfigMap(null);
+      _helixResourceManager.updateTableConfig(tableConfig);
+
+      // Try dry-run summary mode without reassignment to ensure that existing 
instance partitions are used
+      // no movement should occur
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceConfig.setPreChecks(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+      assertEquals(
+          
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
+              .getMessage(), "OFFLINE segments - Replica Groups are not 
enabled, replication: " + NUM_REPLICAS);
+      rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+      assertNotNull(rebalanceSummaryResult.getTagsInfo());
+      assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+          TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+          numSegments * NUM_REPLICAS);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+          numServers + numServersToAdd);
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      // Without instances reassignment, the rebalance should return status 
NO_OP, and the existing instance partitions
+      // should be used
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+      assertEquals(rebalanceResult.getInstanceAssignment(), 
instanceAssignment);
+      assertEquals(rebalanceResult.getSegmentAssignment(), 
newSegmentAssignment);
+
+      // Try dry-run summary mode with reassignment
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceConfig.setPreChecks(true);
+      rebalanceConfig.setReassignInstances(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      assertEquals(
+          
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
+          RebalancePreCheckerResult.PreCheckStatus.PASS);
+      
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
+              .getMessage(), "OFFLINE segments - Replica Groups are not 
enabled, replication: " + NUM_REPLICAS);
+      rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      // No move expected since already balanced
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+      assertNotNull(rebalanceSummaryResult.getTagsInfo());
+      assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+          TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+          numSegments * NUM_REPLICAS);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+          numServers + numServersToAdd);
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      // With instances reassignment, the rebalance should return status DONE, 
the existing instance partitions should
+      // be removed, and the default instance partitions should be used
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setReassignInstances(true);
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      
assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+          
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
+
+      // All servers should be assigned to the table
+      instanceAssignment = rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
+      assertEquals(instancePartitions.getNumReplicaGroups(), 1);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+      // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+      assertEquals(instancePartitions.getInstances(0, 0),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, 
SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4,
+              SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, 
SERVER_INSTANCE_ID_PREFIX + 1));
+
+      // Segment assignment should not change as it is already balanced
+      assertEquals(rebalanceResult.getSegmentAssignment(), 
newSegmentAssignment);
+
+      // Remove the tag from the added servers
+      for (int i = 0; i < numServersToAdd; i++) {
+        _helixAdmin.removeInstanceTag(getHelixClusterName(), 
SERVER_INSTANCE_ID_PREFIX + (numServers + i),
+            TagNameUtils.getOfflineTagForTenant(null));
+      }
 
-    // Try dry-run summary mode with reassignment
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceConfig.setPreChecks(true);
-    rebalanceConfig.setReassignInstances(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    assertEquals(
-        
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
-        RebalancePreCheckerResult.PreCheckStatus.PASS);
-    
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
-        "OFFLINE segments - Replica Groups are not enabled, replication: " + 
NUM_REPLICAS);
-    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    // No move expected since already balanced
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
-    assertNotNull(rebalanceSummaryResult.getTagsInfo());
-    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-        TagNameUtils.getOfflineTagForTenant(null));
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers + numServersToAdd);
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
+      // Try dry-run summary mode
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(true);
+      rebalanceConfig.setDowntime(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+      rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+      assertNotNull(rebalanceSummaryResult);
+      assertNotNull(rebalanceSummaryResult.getServerInfo());
+      assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 15);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 15);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+      
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
+      assertNotNull(rebalanceSummaryResult.getTagsInfo());
+      assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+          TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 15);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+          numSegments * NUM_REPLICAS - 15);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+          numServers);
+      assertNotNull(rebalanceResult.getInstanceAssignment());
+      assertNotNull(rebalanceResult.getSegmentAssignment());
+
+      // Rebalance with downtime should succeed
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDowntime(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+      // All servers with tag should be assigned to the table
+      instanceAssignment = rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
+      assertEquals(instancePartitions.getNumReplicaGroups(), 1);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+      // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2
+      assertEquals(instancePartitions.getInstances(0, 0),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, 
SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1));
+
+      // New segment assignment should not contain servers without tag
+      newSegmentAssignment = rebalanceResult.getSegmentAssignment();
+      for (int i = 0; i < numSegments; i++) {
+        String segmentName = SEGMENT_NAME_PREFIX + i;
+        Map<String, String> instanceStateMap = 
newSegmentAssignment.get(segmentName);
+        assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+        for (int j = 0; j < numServersToAdd; j++) {
+          assertFalse(instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + 
(numServers + j)));
+        }
+      }
 
-    // With instances reassignment, the rebalance should return status DONE, 
the existing instance partitions should be
-    // removed, and the default instance partitions should be used
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setReassignInstances(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
-        
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
-
-    // All servers should be assigned to the table
-    instanceAssignment = rebalanceResult.getInstanceAssignment();
-    assertEquals(instanceAssignment.size(), 1);
-    instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
-    assertEquals(instancePartitions.getNumReplicaGroups(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
-    assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 4,
-            SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, 
SERVER_INSTANCE_ID_PREFIX + 1));
+      // Try pre-checks mode without dry-run set
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setPreChecks(true);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+      assertNull(rebalanceResult.getRebalanceSummaryResult());
+      assertNull(rebalanceResult.getPreChecksResult());
 
-    // Segment assignment should not change as it is already balanced
-    assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
 
-    // Remove the tag from the added servers
-    for (int i = 0; i < numServersToAdd; i++) {
-      _helixAdmin.removeInstanceTag(getHelixClusterName(), 
SERVER_INSTANCE_ID_PREFIX + (numServers + i),
-          TagNameUtils.getOfflineTagForTenant(null));
+      for (int i = 0; i < numServers; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
+      for (int i = 0; i < numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+      }
+      executorService.shutdown();
     }
+  }
 
-    // Try dry-run summary mode
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDryRun(true);
-    rebalanceConfig.setDowntime(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
-    assertNotNull(rebalanceSummaryResult);
-    assertNotNull(rebalanceSummaryResult.getServerInfo());
-    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 15);
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 15);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
-    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
-    assertNotNull(rebalanceSummaryResult.getTagsInfo());
-    assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-    assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-        TagNameUtils.getOfflineTagForTenant(null));
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 15);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS - 15);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers);
-    assertNotNull(rebalanceResult.getInstanceAssignment());
-    assertNotNull(rebalanceResult.getSegmentAssignment());
-
-    // Rebalance with downtime should succeed
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setDowntime(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
-
-    // All servers with tag should be assigned to the table
-    instanceAssignment = rebalanceResult.getInstanceAssignment();
-    assertEquals(instanceAssignment.size(), 1);
-    instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
-    assertEquals(instancePartitions.getNumReplicaGroups(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2
-    assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 0, SERVER_INSTANCE_ID_PREFIX + 1));
+  @Test(timeOut = 60000)
+  public void testRebalanceStrictReplicaGroup()
+      throws Exception {
+    for (int batchSizePerServer : 
Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 3, 1)) {
+      int numServers = 3;
+      // Mock disk usage
+      Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+
+      for (int i = 0; i < numServers; i++) {
+        String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+        addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+        DiskUsageInfo diskUsageInfo1 =
+            new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
+        diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+      }
 
-    // New segment assignment should not contain servers without tag
-    newSegmentAssignment = rebalanceResult.getSegmentAssignment();
-    for (int i = 0; i < numSegments; i++) {
-      String segmentName = SEGMENT_NAME_PREFIX + i;
-      Map<String, String> instanceStateMap = 
newSegmentAssignment.get(segmentName);
-      assertEquals(instanceStateMap.size(), NUM_REPLICAS);
-      for (int j = 0; j < numServersToAdd; j++) {
-        assertFalse(instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + 
(numServers + j)));
+      ExecutorService executorService = Executors.newFixedThreadPool(10);
+      DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+      preChecker.init(_helixResourceManager, executorService, 1);
+      TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, 
null, null, preChecker,
+          _helixResourceManager.getTableSizeReader());
+      // Set up the table with 1 replication factor and strict replica group 
enabled
+      TableConfig tableConfig =
+          new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
+              .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
+                  false)).build();
+
+      // Create the table
+      addDummySchema(RAW_TABLE_NAME);
+      _helixResourceManager.addTable(tableConfig);
+
+      // Add the segments
+      int numSegments = 10;
+      for (int i = 0; i < numSegments; i++) {
+        _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+            SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
+      }
+      Map<String, Map<String, String>> oldSegmentAssignment =
+          
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+      for (Map.Entry<String, Map<String, String>> entry : 
oldSegmentAssignment.entrySet()) {
+        assertEquals(entry.getValue().size(), 1);
       }
-    }
 
-    // Try pre-checks mode without dry-run set
-    rebalanceConfig = new RebalanceConfig();
-    rebalanceConfig.setPreChecks(true);
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
-    assertNull(rebalanceResult.getRebalanceSummaryResult());
-    assertNull(rebalanceResult.getPreChecksResult());
+      // Rebalance should return NO_OP status since there has been no change
+      RebalanceConfig rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
+      // All servers should be assigned to the table
+      Map<InstancePartitionsType, InstancePartitions> instanceAssignment = 
rebalanceResult.getInstanceAssignment();
+      assertEquals(instanceAssignment.size(), 1);
+      InstancePartitions instancePartitions = 
instanceAssignment.get(InstancePartitionsType.OFFLINE);
+      assertEquals(instancePartitions.getNumReplicaGroups(), 1);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+      // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2
+      assertEquals(instancePartitions.getInstances(0, 0),
+          Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, 
SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1));
+
+      // Segment assignment should not change
+      assertEquals(rebalanceResult.getSegmentAssignment(), 
oldSegmentAssignment);
+
+      // Increase the replication factor to 3
+      tableConfig.getValidationConfig().setReplication("3");
+      rebalanceConfig = new RebalanceConfig();
+      rebalanceConfig.setDryRun(false);
+      rebalanceConfig.setPreChecks(false);
+      rebalanceConfig.setReassignInstances(true);
+      rebalanceConfig.setBatchSizePerServer(batchSizePerServer);
+      // minAvailableReplicas = -1 results in minAvailableReplicas = target 
replication - 1 = 2 in this case
+      rebalanceConfig.setMinAvailableReplicas(-1);
+      rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+      Map<String, Map<String, String>> newSegmentAssignment = 
rebalanceResult.getSegmentAssignment();
+      assertNotEquals(oldSegmentAssignment, newSegmentAssignment);
+      for (Map.Entry<String, Map<String, String>> entry : 
newSegmentAssignment.entrySet()) {
+        assertTrue(oldSegmentAssignment.containsKey(entry.getKey()));
+        assertEquals(entry.getValue().size(), 3);
+      }
 
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
 
-    for (int i = 0; i < numServers; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
-    }
-    for (int i = 0; i < numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+      for (int i = 0; i < numServers; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
+      executorService.shutdown();
     }
-    executorService.shutdown();
   }
 
-  @Test(timeOut = 60000)
-  public void testRebalanceStrictReplicaGroup()
+  @Test
+  public void testRebalanceBatchSizePerServerErrors()
       throws Exception {
     int numServers = 3;
     // Mock disk usage

Review Comment:
   good catch, I've removed it from this test and the other test where it 
wasn't used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to