somandal commented on code in PR #15110:
URL: https://github.com/apache/pinot/pull/15110#discussion_r1970742135
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws
FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
}
+ @Test
+ public void testForceMinimizeDataMovement() {
+ // This test case is using the same instance rebalance plot as
testMinimizeDataMovement, and test whether
+ // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the
minimizeDataMovement flag in
+ // TableConfig does.
+ int numReplicas = 3;
+ int numPartitions = 2;
+ int numInstancesPerPartition = 2;
+ String partitionColumn = "partition";
+
+ // Configs and driver that minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
+
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+
+ // Configs and driver that DO NOT minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+
+ TableConfig tableConfigNotMinimized = new TableConfig(tableConfig);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+ InstanceAssignmentDriver driverNotMinimized = new
InstanceAssignmentDriver(tableConfigNotMinimized);
Review Comment:
can you add an assert that when you check if this TableConfig has
minimizeDataMovement = false?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+
+ @Test
+ public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+ throws Exception {
+ // TODO: try balanced (default) instance assignment
+ int numServers = 6;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ // Create the table with default balanced segment assignment
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME,
SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ RebalanceSummaryResult.ServerInfo rebalanceServerInfo =
rebalanceSummaryResult.getServerInfo();
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
numServers);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add one server instance
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
numServers, true);
+
+ // Table without instance assignment config should work fine (ignore) with
the minimizeDataMovement flag set
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+ // Should see the added server
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(),
numServers);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
numServers + 1);
Review Comment:
is it possible to validate that the expected changes are the same for
`setMinimizeDataMovement(true)` and `setMinimizeDataMovement(false)`?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws
FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
}
+ @Test
+ public void testForceMinimizeDataMovement() {
+ // This test case is using the same instance rebalance plot as
testMinimizeDataMovement, and test whether
+ // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the
minimizeDataMovement flag in
+ // TableConfig does.
+ int numReplicas = 3;
+ int numPartitions = 2;
+ int numInstancesPerPartition = 2;
+ String partitionColumn = "partition";
+
+ // Configs and driver that minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
Review Comment:
can you add an assert that when you check if this TableConfig has
minimizeDataMovement = true?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+
+ @Test
+ public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+ throws Exception {
+ // TODO: try balanced (default) instance assignment
Review Comment:
nit: is this still a TODO?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws
FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
}
+ @Test
+ public void testForceMinimizeDataMovement() {
+ // This test case is using the same instance rebalance plot as
testMinimizeDataMovement, and test whether
+ // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the
minimizeDataMovement flag in
+ // TableConfig does.
+ int numReplicas = 3;
+ int numPartitions = 2;
+ int numInstancesPerPartition = 2;
+ String partitionColumn = "partition";
+
+ // Configs and driver that minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
+
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+
+ // Configs and driver that DO NOT minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+
+ TableConfig tableConfigNotMinimized = new TableConfig(tableConfig);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+ InstanceAssignmentDriver driverNotMinimized = new
InstanceAssignmentDriver(tableConfigNotMinimized);
+
+
+ int numInstances = 10;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Instances should be assigned to 3 replica-groups with a round-robin
fashion, each with 2 instances
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, false);
+
+ InstancePartitions instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, true);
+
+ InstancePartitions instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, false);
+
+ // Assignment without existing InstancePartitions should be the same for
all scenarios
Review Comment:
what does this comment mean?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,67 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, false);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ boolean forceMinimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null,
forceMinimizeDataMovement);
}
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions,
+ preConfiguredInstancePartitions, false);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions, boolean
forceMinimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions,
+ forceMinimizeDataMovement);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return assignInstances(tierName, instanceConfigs,
existingInstancePartitions, instanceAssignmentConfig, false);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig,
+ boolean forceMinimizeDataMovement) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null, forceMinimizeDataMovement);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
- @Nullable InstancePartitions preConfiguredInstancePartitions) {
+ @Nullable InstancePartitions preConfiguredInstancePartitions, boolean
forceMinimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
- boolean minimizeDataMovement =
instanceAssignmentConfig.isMinimizeDataMovement();
+ boolean minimizeDataMovement = forceMinimizeDataMovement ||
instanceAssignmentConfig.isMinimizeDataMovement();
Review Comment:
Can you also add a comment here stating that blindly setting
`minimizeDataMovement` to `true` is okay, and why (which is that for balanced
mode there is a check that resets this if instance partitions don't actually
exist)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -1303,7 +1303,7 @@ private void handleLegacySchemaConfig(TableConfig
tableConfig, HttpHeaders httpH
private void validateInstanceAssignment(TableConfig tableConfig) {
TableRebalancer tableRebalancer = new
TableRebalancer(_pinotHelixResourceManager.getHelixZkManager());
try {
- tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true);
+ tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true,
false);
Review Comment:
let's use the method signature without the `forceMinimizeDataMovement`?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+
+ @Test
+ public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+ throws Exception {
+ // TODO: try balanced (default) instance assignment
+ int numServers = 6;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ // Create the table with default balanced segment assignment
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
Review Comment:
nit: are you testing tiering here? why the tiered table name? i'd recommend
adding a separate test for tiering if you want to test tier and non-tier
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,67 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, false);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ boolean forceMinimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null,
forceMinimizeDataMovement);
}
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions,
+ preConfiguredInstancePartitions, false);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions, boolean
forceMinimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions,
+ forceMinimizeDataMovement);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return assignInstances(tierName, instanceConfigs,
existingInstancePartitions, instanceAssignmentConfig, false);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig,
+ boolean forceMinimizeDataMovement) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null, forceMinimizeDataMovement);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
- @Nullable InstancePartitions preConfiguredInstancePartitions) {
+ @Nullable InstancePartitions preConfiguredInstancePartitions, boolean
forceMinimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
- boolean minimizeDataMovement =
instanceAssignmentConfig.isMinimizeDataMovement();
+ boolean minimizeDataMovement = forceMinimizeDataMovement ||
instanceAssignmentConfig.isMinimizeDataMovement();
+ if (forceMinimizeDataMovement) {
+ LOGGER.info("Table: {} is forced with minimizeDataMovement",
tableNameWithType);
+ } else if (instanceAssignmentConfig.isMinimizeDataMovement()) {
+ LOGGER.info("Table: {} has configured minimizeDataMovement",
tableNameWithType);
+ } else {
+ LOGGER.warn("Table: {} is being rebalanced without
minimizeDataMovement", tableNameWithType);
+ }
Review Comment:
Let's just add a single log line where you print `tableNameWithType`,
`instanceAssignmentConfig.isMinimizeDataMovement()`, and
`forceMinimizeDataMovement`. No need for this kind of if-else.
Also, enough to keep this an info log for now
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws
FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
}
+ @Test
+ public void testForceMinimizeDataMovement() {
Review Comment:
can we add a similar test for balanced - just show that it is always the
same (pick 1 scenario like increasing RF)?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+
+ @Test
+ public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
Review Comment:
can we split this into 2 tests - 1 for balanced assignment and 1 for
replicaGroup assignment? it'll be easier to read then
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+
+ @Test
+ public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+ throws Exception {
+ // TODO: try balanced (default) instance assignment
+ int numServers = 6;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ // Create the table with default balanced segment assignment
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME,
SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ RebalanceSummaryResult.ServerInfo rebalanceServerInfo =
rebalanceSummaryResult.getServerInfo();
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
numServers);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add one server instance
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
numServers, true);
+
+ // Table without instance assignment config should work fine (ignore) with
the minimizeDataMovement flag set
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+ // Should see the added server
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(),
numServers);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
numServers + 1);
+
+ // Rebalance
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ // Should see the added server in the instance assignment
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getInstances(0,
0).size(), numServers + 1);
+
+ // One instance per replica group, no partition
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0,
0, false,
+ null), null, false);
+
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+ // With instance assignment config, the number of servers is reduced to
the number of replica groups
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(),
numServers + 1);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
NUM_REPLICAS);
+
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(),
NUM_REPLICAS);
+
+
+ // increase replica group size by 1
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS + 1, 1,
0, 0, false,
+ null), null, true);
+
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceServerInfo =
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+ // with minimize data movement, we should add 1 server only
Review Comment:
what happens for this test case if we use `setMinimizeDataMovement(false)`?
Similarly for the case when a replica is removed? Just wondering if your test
is able to capture the difference between the two modes or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]