somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085023573
########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java: ########## @@ -105,568 +105,672 @@ public void setUp() @Test public void testRebalance() throws Exception { - int numServers = 3; - // Mock disk usage - Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); - - for (int i = 0; i < numServers; i++) { - String instanceId = SERVER_INSTANCE_ID_PREFIX + i; - addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo1 = - new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); - diskUsageInfoMap.put(instanceId, diskUsageInfo1); - } - - ExecutorService executorService = Executors.newFixedThreadPool(10); - DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); - preChecker.init(_helixResourceManager, executorService, 1); - TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); - - // Rebalance should fail without creating the table - RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); - assertNull(rebalanceResult.getRebalanceSummaryResult()); - - // Rebalance with dry-run summary should fail without creating the table - RebalanceConfig rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); - assertNull(rebalanceResult.getRebalanceSummaryResult()); - - // Create the table - addDummySchema(RAW_TABLE_NAME); - _helixResourceManager.addTable(tableConfig); - - // Add the segments - int numSegments = 10; - for (int i = 0; i < numSegments; i++) { - _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i), null); - } - Map<String, Map<String, String>> oldSegmentAssignment = - _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); - - // Rebalance with dry-run summary should return NO_OP status - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); - RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); - assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary()); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); - assertNotNull(rebalanceSummaryResult.getTagsInfo()); - assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); - - // Dry-run mode should not change the IdealState - assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), - oldSegmentAssignment); - - // Rebalance should return NO_OP status - rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); - - // All servers should be assigned to the table - Map<InstancePartitionsType, InstancePartitions> instanceAssignment = rebalanceResult.getInstanceAssignment(); - assertEquals(instanceAssignment.size(), 1); - InstancePartitions instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); - assertEquals(instancePartitions.getNumReplicaGroups(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2 - assertEquals(instancePartitions.getInstances(0, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); - - // Segment assignment should not change - assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); - - // Add 3 more servers - int numServersToAdd = 3; - for (int i = 0; i < numServersToAdd; i++) { - String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i); - addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo = - new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); - diskUsageInfoMap.put(instanceId, diskUsageInfo); - } - - ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); - - // Rebalance in dry-run summary mode with added servers - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 14); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 14); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); - assertNotNull(rebalanceSummaryResult.getTagsInfo()); - assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 14); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS - 14); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers + numServersToAdd); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); - - Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = - rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo(); - assertNotNull(serverSegmentChangeInfoMap); - for (int i = 0; i < numServers; i++) { - // Original servers should be losing some segments - String newServer = SERVER_INSTANCE_ID_PREFIX + i; - RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); - assertTrue(serverSegmentChange.getSegmentsDeleted() > 0); - assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); - assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); - assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0); - assertEquals(serverSegmentChange.getSegmentsAdded(), 0); - } - for (int i = 0; i < numServersToAdd; i++) { - // New servers should only get new segments - String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i); - RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); - assertTrue(serverSegmentChange.getSegmentsAdded() > 0); - assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0); - assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), serverSegmentChange.getSegmentsAdded()); - assertEquals(serverSegmentChange.getSegmentsDeleted(), 0); - assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0); - } - - // Dry-run mode should not change the IdealState - assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), - oldSegmentAssignment); - - // Rebalance in dry-run mode - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceConfig.setPreChecks(true); - rebalanceConfig.setReassignInstances(true); - - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - Map<String, RebalancePreCheckerResult> preCheckResult = rebalanceResult.getPreChecksResult(); - assertNotNull(preCheckResult); - assertEquals(preCheckResult.size(), 6); - assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); - assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); - assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); - assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); - assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS)); - assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)); - // Sending request to servers should fail for all, so needsPreprocess should be set to "error" to indicate that a - // manual check is needed - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.ERROR); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getMessage(), - "Could not determine needReload status, run needReload API manually"); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), - "Instance assignment not allowed, no need for minimizeDataMovement"); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertTrue( - preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE) - .getMessage() - .startsWith("Within threshold")); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertTrue( - preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE) - .getMessage() - .startsWith("Within threshold")); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), - "All rebalance parameters look good"); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), - "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); - - // All servers should be assigned to the table - instanceAssignment = rebalanceResult.getInstanceAssignment(); - assertEquals(instanceAssignment.size(), 1); - instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); - assertEquals(instancePartitions.getNumReplicaGroups(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2 - assertEquals(instancePartitions.getInstances(0, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4, - SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); - - // Segments should be moved to the new added servers - Map<String, Map<String, String>> newSegmentAssignment = rebalanceResult.getSegmentAssignment(); - Map<String, IntIntPair> instanceToNumSegmentsToMoveMap = - SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, newSegmentAssignment); - assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + numServersToAdd); - for (int i = 0; i < numServersToAdd; i++) { - IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); - assertNotNull(numSegmentsToMove); - assertTrue(numSegmentsToMove.leftInt() > 0); - assertEquals(numSegmentsToMove.rightInt(), 0); - } - for (int i = 0; i < numServers; i++) { - IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i); - assertNotNull(numSegmentsToMove); - assertEquals(numSegmentsToMove.leftInt(), 0); - assertTrue(numSegmentsToMove.rightInt() > 0); - } - - // Dry-run mode should not change the IdealState - assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), - oldSegmentAssignment); - - // Rebalance dry-run summary with 3 min available replicas should not be impacted since actual rebalance does not - // occur - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceConfig.setPreChecks(true); - rebalanceConfig.setMinAvailableReplicas(3); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); - assertNotNull(rebalanceResult.getPreChecksResult()); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); - - // Rebalance with 3 min available replicas should fail as the table only have 3 replicas - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setMinAvailableReplicas(3); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); - - // IdealState should not change for FAILED rebalance - assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), - oldSegmentAssignment); - - // Rebalance with 2 min available replicas should succeed - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setMinAvailableReplicas(2); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - - // Result should be the same as the result in dry-run mode - instanceAssignment = rebalanceResult.getInstanceAssignment(); - assertEquals(instanceAssignment.size(), 1); - assertEquals(instanceAssignment.get(InstancePartitionsType.OFFLINE).getPartitionToInstancesMap(), - instancePartitions.getPartitionToInstancesMap()); - assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); - - // Update the table config to use replica-group based assignment - InstanceTagPoolConfig tagPoolConfig = - new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null); - InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = - new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); - tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); - _helixResourceManager.updateTableConfig(tableConfig); - - // Try dry-run summary mode - // No need to reassign instances because instances should be automatically assigned when updating the table config - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceConfig.setPreChecks(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - // Though instance partition map is set in ZK, the pre-checker is unaware of that, a warning will be thrown - assertEquals( - rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.WARN); - assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), - "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " - + NUM_REPLICAS + ", numInstancesPerReplicaGroup: 0 (using as many instances as possible)"); - rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 11); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 11); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); - assertNotNull(rebalanceSummaryResult.getTagsInfo()); - assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 11); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS - 11); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers + numServersToAdd); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); - - serverSegmentChangeInfoMap = rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo(); - assertNotNull(serverSegmentChangeInfoMap); - for (int i = 0; i < numServers + numServersToAdd; i++) { - String newServer = SERVER_INSTANCE_ID_PREFIX + i; - RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); - assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5); - // Ensure not all segments moved - assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); - // Ensure all segments has something assigned prior to rebalance - assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); - } - - // Dry-run mode should not change the IdealState - assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), - newSegmentAssignment); - - // Try actual rebalance - // No need to reassign instances because instances should be automatically assigned when updating the table config - rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + for (int batchSizePerServer : Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 1, 2)) { + int numServers = 3; + // Mock disk usage + Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); + + for (int i = 0; i < numServers; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + i; + addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); + DiskUsageInfo diskUsageInfo1 = + new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); + diskUsageInfoMap.put(instanceId, diskUsageInfo1); + } - // There should be 3 replica-groups, each with 2 servers - instanceAssignment = rebalanceResult.getInstanceAssignment(); - assertEquals(instanceAssignment.size(), 1); - instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); - assertEquals(instancePartitions.getNumReplicaGroups(), NUM_REPLICAS); - assertEquals(instancePartitions.getNumPartitions(), 1); - // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2 - // [i2, i3, i4, i5, i0, i1] - // r0 r1 r2 r0 r1 r2 - assertEquals(instancePartitions.getInstances(0, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5)); - assertEquals(instancePartitions.getInstances(0, 1), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 0)); - assertEquals(instancePartitions.getInstances(0, 2), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 1)); + ExecutorService executorService = Executors.newFixedThreadPool(10); + DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); + preChecker.init(_helixResourceManager, executorService, 1); + TableRebalancer tableRebalancer = + new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); + + // Rebalance should fail without creating the table + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + + // Rebalance with dry-run summary should fail without creating the table + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + + // Create the table + addDummySchema(RAW_TABLE_NAME); + _helixResourceManager.addTable(tableConfig); + + // Add the segments + int numSegments = 10; + for (int i = 0; i < numSegments; i++) { + _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i), null); + } + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); + + // Rebalance with dry-run summary should return NO_OP status + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); + assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary()); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + oldSegmentAssignment); + + // Rebalance should return NO_OP status + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + + // All servers should be assigned to the table + Map<InstancePartitionsType, InstancePartitions> instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + InstancePartitions instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); + assertEquals(instancePartitions.getNumPartitions(), 1); + // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + + // Segment assignment should not change + assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); + + // Add 3 more servers + int numServersToAdd = 3; + for (int i = 0; i < numServersToAdd; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i); + addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); + DiskUsageInfo diskUsageInfo = + new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); + diskUsageInfoMap.put(instanceId, diskUsageInfo); + } - // The assignment are based on replica-group 0 and mirrored to all the replica-groups, so server of index 0, 1, 5 - // should have the same segments assigned, and server of index 2, 3, 4 should have the same segments assigned, each - // with 5 segments - newSegmentAssignment = rebalanceResult.getSegmentAssignment(); - int numSegmentsOnServer0 = 0; - for (int i = 0; i < numSegments; i++) { - String segmentName = SEGMENT_NAME_PREFIX + i; - Map<String, String> instanceStateMap = newSegmentAssignment.get(segmentName); - assertEquals(instanceStateMap.size(), NUM_REPLICAS); - if (instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + 0)) { - numSegmentsOnServer0++; - assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 0), ONLINE); - assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 1), ONLINE); - assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 5), ONLINE); - } else { - assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 2), ONLINE); - assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 3), ONLINE); - assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 4), ONLINE); + ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); + + // Rebalance in dry-run summary mode with added servers + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 14); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 14); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 14); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS - 14); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = + rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo(); + assertNotNull(serverSegmentChangeInfoMap); + for (int i = 0; i < numServers; i++) { + // Original servers should be losing some segments + String newServer = SERVER_INSTANCE_ID_PREFIX + i; + RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertTrue(serverSegmentChange.getSegmentsDeleted() > 0); + assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); + assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); + assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0); + assertEquals(serverSegmentChange.getSegmentsAdded(), 0); + } + for (int i = 0; i < numServersToAdd; i++) { + // New servers should only get new segments + String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i); + RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertTrue(serverSegmentChange.getSegmentsAdded() > 0); + assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0); + assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), serverSegmentChange.getSegmentsAdded()); + assertEquals(serverSegmentChange.getSegmentsDeleted(), 0); + assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0); } - } - assertEquals(numSegmentsOnServer0, numSegments / 2); - // Update the table config to use non-replica-group based assignment - tableConfig.setInstanceAssignmentConfigMap(null); - _helixResourceManager.updateTableConfig(tableConfig); + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + oldSegmentAssignment); + + // Rebalance in dry-run mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setReassignInstances(true); + + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + Map<String, RebalancePreCheckerResult> preCheckResult = rebalanceResult.getPreChecksResult(); + assertNotNull(preCheckResult); + assertEquals(preCheckResult.size(), 6); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)); + // Sending request to servers should fail for all, so needsPreprocess should be set to "error" to indicate that a + // manual check is needed + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.ERROR); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getMessage(), + "Could not determine needReload status, run needReload API manually"); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + "Instance assignment not allowed, no need for minimizeDataMovement"); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertTrue( + preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE) + .getMessage() + .startsWith("Within threshold")); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertTrue( + preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE) + .getMessage() + .startsWith("Within threshold")); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), + "All rebalance parameters look good"); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); + + // All servers should be assigned to the table + instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); + assertEquals(instancePartitions.getNumPartitions(), 1); + // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + + // Segments should be moved to the new added servers + Map<String, Map<String, String>> newSegmentAssignment = rebalanceResult.getSegmentAssignment(); + Map<String, IntIntPair> instanceToNumSegmentsToMoveMap = + SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, newSegmentAssignment); + assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + numServersToAdd); + for (int i = 0; i < numServersToAdd; i++) { + IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); + assertNotNull(numSegmentsToMove); + assertTrue(numSegmentsToMove.leftInt() > 0); + assertEquals(numSegmentsToMove.rightInt(), 0); + } + for (int i = 0; i < numServers; i++) { + IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i); + assertNotNull(numSegmentsToMove); + assertEquals(numSegmentsToMove.leftInt(), 0); + assertTrue(numSegmentsToMove.rightInt() > 0); + } - // Try dry-run summary mode without reassignment to ensure that existing instance partitions are used - // no movement should occur - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceConfig.setPreChecks(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); - assertEquals( - rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), - "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); - rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); - assertNotNull(rebalanceSummaryResult.getTagsInfo()); - assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers + numServersToAdd); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + oldSegmentAssignment); + + // Rebalance dry-run summary with 3 min available replicas should not be impacted since actual rebalance does not + // occur + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setMinAvailableReplicas(3); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertNotNull(rebalanceResult.getPreChecksResult()); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + // Rebalance with 3 min available replicas should fail as the table only have 3 replicas + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setMinAvailableReplicas(3); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + + // IdealState should not change for FAILED rebalance + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + oldSegmentAssignment); + + // Rebalance with 2 min available replicas should succeed + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setMinAvailableReplicas(2); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + // Result should be the same as the result in dry-run mode + instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + assertEquals(instanceAssignment.get(InstancePartitionsType.OFFLINE).getPartitionToInstancesMap(), + instancePartitions.getPartitionToInstancesMap()); + assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); + + // Update the table config to use replica-group based assignment + InstanceTagPoolConfig tagPoolConfig = + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); + _helixResourceManager.updateTableConfig(tableConfig); + + // Try dry-run summary mode + // No need to reassign instances because instances should be automatically assigned when updating the table config + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + // Though instance partition map is set in ZK, the pre-checker is unaware of that, a warning will be thrown + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO) + .getMessage(), "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments " + + "- numReplicaGroups: " + NUM_REPLICAS + ", numInstancesPerReplicaGroup: 0 (using as many instances as " + + "possible)"); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 11); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 11); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 11); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS - 11); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + serverSegmentChangeInfoMap = rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo(); + assertNotNull(serverSegmentChangeInfoMap); + for (int i = 0; i < numServers + numServersToAdd; i++) { + String newServer = SERVER_INSTANCE_ID_PREFIX + i; + RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5); + // Ensure not all segments moved + assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); + // Ensure all segments has something assigned prior to rebalance + assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); + } - // Without instances reassignment, the rebalance should return status NO_OP, and the existing instance partitions - // should be used - rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); - assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment); - assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + newSegmentAssignment); + + // Try actual rebalance + // No need to reassign instances because instances should be automatically assigned when updating the table config + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + // There should be 3 replica-groups, each with 2 servers + instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); + assertEquals(instancePartitions.getNumReplicaGroups(), NUM_REPLICAS); + assertEquals(instancePartitions.getNumPartitions(), 1); + // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2 + // [i2, i3, i4, i5, i0, i1] + // r0 r1 r2 r0 r1 r2 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 0)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 1)); + + // The assignment are based on replica-group 0 and mirrored to all the replica-groups, so server of index 0, 1, 5 + // should have the same segments assigned, and server of index 2, 3, 4 should have the same segments assigned, + // each with 5 segments + newSegmentAssignment = rebalanceResult.getSegmentAssignment(); + int numSegmentsOnServer0 = 0; + for (int i = 0; i < numSegments; i++) { + String segmentName = SEGMENT_NAME_PREFIX + i; + Map<String, String> instanceStateMap = newSegmentAssignment.get(segmentName); + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + if (instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + 0)) { + numSegmentsOnServer0++; + assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 0), ONLINE); + assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 1), ONLINE); + assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 5), ONLINE); + } else { + assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 2), ONLINE); + assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 3), ONLINE); + assertEquals(instanceStateMap.get(SERVER_INSTANCE_ID_PREFIX + 4), ONLINE); + } + } + assertEquals(numSegmentsOnServer0, numSegments / 2); + + // Update the table config to use non-replica-group based assignment + tableConfig.setInstanceAssignmentConfigMap(null); + _helixResourceManager.updateTableConfig(tableConfig); + + // Try dry-run summary mode without reassignment to ensure that existing instance partitions are used + // no movement should occur + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO) + .getMessage(), "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + // Without instances reassignment, the rebalance should return status NO_OP, and the existing instance partitions + // should be used + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment); + assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); + + // Try dry-run summary mode with reassignment + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setReassignInstances(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO) + .getMessage(), "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + // No move expected since already balanced + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers + numServersToAdd); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + // With instances reassignment, the rebalance should return status DONE, the existing instance partitions should + // be removed, and the default instance partitions should be used + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, + InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME))); + + // All servers should be assigned to the table + instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); + assertEquals(instancePartitions.getNumPartitions(), 1); + // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + + // Segment assignment should not change as it is already balanced + assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); + + // Remove the tag from the added servers + for (int i = 0; i < numServersToAdd; i++) { + _helixAdmin.removeInstanceTag(getHelixClusterName(), SERVER_INSTANCE_ID_PREFIX + (numServers + i), + TagNameUtils.getOfflineTagForTenant(null)); + } - // Try dry-run summary mode with reassignment - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceConfig.setPreChecks(true); - rebalanceConfig.setReassignInstances(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - assertEquals( - rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), - RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), - "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); - rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - // No move expected since already balanced - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); - assertNotNull(rebalanceSummaryResult.getTagsInfo()); - assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers + numServersToAdd); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setDowntime(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 15); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 15); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); + assertNotNull(rebalanceSummaryResult.getTagsInfo()); + assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 15); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), + numSegments * NUM_REPLICAS - 15); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), + numServers); + assertNotNull(rebalanceResult.getInstanceAssignment()); + assertNotNull(rebalanceResult.getSegmentAssignment()); + + // Rebalance with downtime should succeed + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDowntime(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + // All servers with tag should be assigned to the table + instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); + assertEquals(instancePartitions.getNumPartitions(), 1); + // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + + // New segment assignment should not contain servers without tag + newSegmentAssignment = rebalanceResult.getSegmentAssignment(); + for (int i = 0; i < numSegments; i++) { + String segmentName = SEGMENT_NAME_PREFIX + i; + Map<String, String> instanceStateMap = newSegmentAssignment.get(segmentName); + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + for (int j = 0; j < numServersToAdd; j++) { + assertFalse(instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + (numServers + j))); + } + } - // With instances reassignment, the rebalance should return status DONE, the existing instance partitions should be - // removed, and the default instance partitions should be used - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setReassignInstances(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, - InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME))); - - // All servers should be assigned to the table - instanceAssignment = rebalanceResult.getInstanceAssignment(); - assertEquals(instanceAssignment.size(), 1); - instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); - assertEquals(instancePartitions.getNumReplicaGroups(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2 - assertEquals(instancePartitions.getInstances(0, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4, - SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + // Try pre-checks mode without dry-run set + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setPreChecks(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + assertNull(rebalanceResult.getPreChecksResult()); - // Segment assignment should not change as it is already balanced - assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); - // Remove the tag from the added servers - for (int i = 0; i < numServersToAdd; i++) { - _helixAdmin.removeInstanceTag(getHelixClusterName(), SERVER_INSTANCE_ID_PREFIX + (numServers + i), - TagNameUtils.getOfflineTagForTenant(null)); + for (int i = 0; i < numServers; i++) { + stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i); + } + for (int i = 0; i < numServersToAdd; i++) { + stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); + } + executorService.shutdown(); } + } - // Try dry-run summary mode - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDryRun(true); - rebalanceConfig.setDowntime(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); - assertNotNull(rebalanceSummaryResult); - assertNotNull(rebalanceSummaryResult.getServerInfo()); - assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 15); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 15); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); - assertNotNull(rebalanceSummaryResult.getTagsInfo()); - assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 15); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS - 15); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers); - assertNotNull(rebalanceResult.getInstanceAssignment()); - assertNotNull(rebalanceResult.getSegmentAssignment()); - - // Rebalance with downtime should succeed - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setDowntime(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - - // All servers with tag should be assigned to the table - instanceAssignment = rebalanceResult.getInstanceAssignment(); - assertEquals(instanceAssignment.size(), 1); - instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); - assertEquals(instancePartitions.getNumReplicaGroups(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2 - assertEquals(instancePartitions.getInstances(0, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + @Test(timeOut = 60000) + public void testRebalanceStrictReplicaGroup() + throws Exception { + for (int batchSizePerServer : Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 3, 1)) { + int numServers = 3; + // Mock disk usage + Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); + + for (int i = 0; i < numServers; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + i; + addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); + DiskUsageInfo diskUsageInfo1 = + new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); + diskUsageInfoMap.put(instanceId, diskUsageInfo1); + } - // New segment assignment should not contain servers without tag - newSegmentAssignment = rebalanceResult.getSegmentAssignment(); - for (int i = 0; i < numSegments; i++) { - String segmentName = SEGMENT_NAME_PREFIX + i; - Map<String, String> instanceStateMap = newSegmentAssignment.get(segmentName); - assertEquals(instanceStateMap.size(), NUM_REPLICAS); - for (int j = 0; j < numServersToAdd; j++) { - assertFalse(instanceStateMap.containsKey(SERVER_INSTANCE_ID_PREFIX + (numServers + j))); + ExecutorService executorService = Executors.newFixedThreadPool(10); + DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); + preChecker.init(_helixResourceManager, executorService, 1); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, + _helixResourceManager.getTableSizeReader()); + // Set up the table with 1 replication factor and strict replica group enabled + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1) + .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, + false)).build(); + + // Create the table + addDummySchema(RAW_TABLE_NAME); + _helixResourceManager.addTable(tableConfig); + + // Add the segments + int numSegments = 10; + for (int i = 0; i < numSegments; i++) { + _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i), null); + } + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); + for (Map.Entry<String, Map<String, String>> entry : oldSegmentAssignment.entrySet()) { + assertEquals(entry.getValue().size(), 1); } - } - // Try pre-checks mode without dry-run set - rebalanceConfig = new RebalanceConfig(); - rebalanceConfig.setPreChecks(true); - rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); - assertNull(rebalanceResult.getRebalanceSummaryResult()); - assertNull(rebalanceResult.getPreChecksResult()); + // Rebalance should return NO_OP status since there has been no change + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + + // All servers should be assigned to the table + Map<InstancePartitionsType, InstancePartitions> instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + InstancePartitions instancePartitions = instanceAssignment.get(InstancePartitionsType.OFFLINE); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); + assertEquals(instancePartitions.getNumPartitions(), 1); + // Math.abs("testTable_OFFLINE".hashCode()) % 3 = 2 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + + // Segment assignment should not change + assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); + + // Increase the replication factor to 3 + tableConfig.getValidationConfig().setReplication("3"); + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(false); + rebalanceConfig.setPreChecks(false); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setBatchSizePerServer(batchSizePerServer); + // minAvailableReplicas = -1 results in minAvailableReplicas = target replication - 1 = 2 in this case + rebalanceConfig.setMinAvailableReplicas(-1); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + Map<String, Map<String, String>> newSegmentAssignment = rebalanceResult.getSegmentAssignment(); + assertNotEquals(oldSegmentAssignment, newSegmentAssignment); + for (Map.Entry<String, Map<String, String>> entry : newSegmentAssignment.entrySet()) { + assertTrue(oldSegmentAssignment.containsKey(entry.getKey())); + assertEquals(entry.getValue().size(), 3); + } - _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); - for (int i = 0; i < numServers; i++) { - stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i); - } - for (int i = 0; i < numServersToAdd; i++) { - stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); + for (int i = 0; i < numServers; i++) { + stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i); + } + executorService.shutdown(); } - executorService.shutdown(); } - @Test(timeOut = 60000) - public void testRebalanceStrictReplicaGroup() + @Test + public void testRebalanceBatchSizePerServerErrors() throws Exception { int numServers = 3; // Mock disk usage Review Comment: good catch, I've removed it from this test and the other test where it wasn't used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org