somandal commented on code in PR #15110:
URL: https://github.com/apache/pinot/pull/15110#discussion_r1970742135


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws 
FileNotFoundException {
     testMirrorServerSetBasedRandomInner(10000000);
   }
 
+  @Test
+  public void testForceMinimizeDataMovement() {
+    // This test case is using the same instance rebalance plot as 
testMinimizeDataMovement, and test whether
+    // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the 
minimizeDataMovement flag in
+    // TableConfig does.
+    int numReplicas = 3;
+    int numPartitions = 2;
+    int numInstancesPerPartition = 2;
+    String partitionColumn = "partition";
+
+    // Configs and driver that minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();
+
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+
+    // Configs and driver that DO NOT minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+
+    TableConfig tableConfigNotMinimized = new TableConfig(tableConfig);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+    InstanceAssignmentDriver driverNotMinimized = new 
InstanceAssignmentDriver(tableConfigNotMinimized);

Review Comment:
   can you add an assert that when you check if this TableConfig has 
minimizeDataMovement = false?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+
+  @Test
+  public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+      throws Exception {
+    // TODO: try balanced (default) instance assignment
+    int numServers = 6;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
+    }
+
+    // Create the table with default balanced segment assignment
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    RebalanceSummaryResult.ServerInfo rebalanceServerInfo = 
rebalanceSummaryResult.getServerInfo();
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 numServers);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add one server instance
+    addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
numServers, true);
+
+    // Table without instance assignment config should work fine (ignore) with 
the minimizeDataMovement flag set
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+    // Should see the added server
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(), 
numServers);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 numServers + 1);

Review Comment:
   is it possible to validate that the expected changes are the same for 
`setMinimizeDataMovement(true)` and `setMinimizeDataMovement(false)`?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws 
FileNotFoundException {
     testMirrorServerSetBasedRandomInner(10000000);
   }
 
+  @Test
+  public void testForceMinimizeDataMovement() {
+    // This test case is using the same instance rebalance plot as 
testMinimizeDataMovement, and test whether
+    // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the 
minimizeDataMovement flag in
+    // TableConfig does.
+    int numReplicas = 3;
+    int numPartitions = 2;
+    int numInstancesPerPartition = 2;
+    String partitionColumn = "partition";
+
+    // Configs and driver that minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();

Review Comment:
   can you add an assert that when you check if this TableConfig has 
minimizeDataMovement = true?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+
+  @Test
+  public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+      throws Exception {
+    // TODO: try balanced (default) instance assignment

Review Comment:
   nit: is this still a TODO?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws 
FileNotFoundException {
     testMirrorServerSetBasedRandomInner(10000000);
   }
 
+  @Test
+  public void testForceMinimizeDataMovement() {
+    // This test case is using the same instance rebalance plot as 
testMinimizeDataMovement, and test whether
+    // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the 
minimizeDataMovement flag in
+    // TableConfig does.
+    int numReplicas = 3;
+    int numPartitions = 2;
+    int numInstancesPerPartition = 2;
+    String partitionColumn = "partition";
+
+    // Configs and driver that minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();
+
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+
+    // Configs and driver that DO NOT minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+
+    TableConfig tableConfigNotMinimized = new TableConfig(tableConfig);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+    InstanceAssignmentDriver driverNotMinimized = new 
InstanceAssignmentDriver(tableConfigNotMinimized);
+
+
+    int numInstances = 10;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 2 instances
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, false);
+
+    InstancePartitions instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, true);
+
+    InstancePartitions instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, false);
+
+    // Assignment without existing InstancePartitions should be the same for 
all scenarios

Review Comment:
   what does this comment mean?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,67 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, false);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      boolean forceMinimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null, 
forceMinimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
       @Nullable InstancePartitions preConfiguredInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions,
+        preConfiguredInstancePartitions, false);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions, boolean 
forceMinimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions,
+        forceMinimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return assignInstances(tierName, instanceConfigs, 
existingInstancePartitions, instanceAssignmentConfig, false);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig,
+      boolean forceMinimizeDataMovement) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null, forceMinimizeDataMovement);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions,
-      @Nullable InstancePartitions preConfiguredInstancePartitions) {
+      @Nullable InstancePartitions preConfiguredInstancePartitions, boolean 
forceMinimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
-    boolean minimizeDataMovement = 
instanceAssignmentConfig.isMinimizeDataMovement();
+    boolean minimizeDataMovement = forceMinimizeDataMovement || 
instanceAssignmentConfig.isMinimizeDataMovement();

Review Comment:
   Can you also add a comment here stating that blindly setting 
`minimizeDataMovement` to `true` is okay, and why (which is that for balanced 
mode there is a check that resets this if instance partitions don't actually 
exist) 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -1303,7 +1303,7 @@ private void handleLegacySchemaConfig(TableConfig 
tableConfig, HttpHeaders httpH
   private void validateInstanceAssignment(TableConfig tableConfig) {
     TableRebalancer tableRebalancer = new 
TableRebalancer(_pinotHelixResourceManager.getHelixZkManager());
     try {
-      tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true);
+      tableRebalancer.getInstancePartitionsMap(tableConfig, true, true, true, 
false);

Review Comment:
   let's use the method signature without the `forceMinimizeDataMovement`?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+
+  @Test
+  public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+      throws Exception {
+    // TODO: try balanced (default) instance assignment
+    int numServers = 6;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
+    }
+
+    // Create the table with default balanced segment assignment
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,

Review Comment:
   nit: are you testing tiering here? why the tiered table name? i'd recommend 
adding a separate test for tiering if you want to test tier and non-tier



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,67 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, false);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      boolean forceMinimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null, 
forceMinimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
       @Nullable InstancePartitions preConfiguredInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions,
+        preConfiguredInstancePartitions, false);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions, boolean 
forceMinimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions,
+        forceMinimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return assignInstances(tierName, instanceConfigs, 
existingInstancePartitions, instanceAssignmentConfig, false);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig,
+      boolean forceMinimizeDataMovement) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null, forceMinimizeDataMovement);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions,
-      @Nullable InstancePartitions preConfiguredInstancePartitions) {
+      @Nullable InstancePartitions preConfiguredInstancePartitions, boolean 
forceMinimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
-    boolean minimizeDataMovement = 
instanceAssignmentConfig.isMinimizeDataMovement();
+    boolean minimizeDataMovement = forceMinimizeDataMovement || 
instanceAssignmentConfig.isMinimizeDataMovement();
+    if (forceMinimizeDataMovement) {
+      LOGGER.info("Table: {} is forced with minimizeDataMovement", 
tableNameWithType);
+    } else if (instanceAssignmentConfig.isMinimizeDataMovement()) {
+      LOGGER.info("Table: {} has configured minimizeDataMovement", 
tableNameWithType);
+    } else {
+      LOGGER.warn("Table: {} is being rebalanced without 
minimizeDataMovement", tableNameWithType);
+    }

Review Comment:
   Let's just add a single log line where you print `tableNameWithType`, 
`instanceAssignmentConfig.isMinimizeDataMovement()`, and 
`forceMinimizeDataMovement`. No need for this kind of if-else.
   
   Also, enough to keep this an info log for now



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -504,6 +505,208 @@ public void testMirrorServerSetBasedRandom() throws 
FileNotFoundException {
     testMirrorServerSetBasedRandomInner(10000000);
   }
 
+  @Test
+  public void testForceMinimizeDataMovement() {

Review Comment:
   can we add a similar test for balanced - just show that it is always the 
same (pick 1 scenario like increasing RF)?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+
+  @Test
+  public void testRebalanceWithMinimizeDataMovementInstanceAssignments()

Review Comment:
   can we split this into 2 tests - 1 for balanced assignment and 1 for 
replicaGroup assignment? it'll be easier to read then



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -886,6 +886,188 @@ public void testRebalanceWithTiersAndInstanceAssignments()
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+
+  @Test
+  public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+      throws Exception {
+    // TODO: try balanced (default) instance assignment
+    int numServers = 6;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
+    }
+
+    // Create the table with default balanced segment assignment
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    RebalanceSummaryResult.ServerInfo rebalanceServerInfo = 
rebalanceSummaryResult.getServerInfo();
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 numServers);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add one server instance
+    addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
numServers, true);
+
+    // Table without instance assignment config should work fine (ignore) with 
the minimizeDataMovement flag set
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+    // Should see the added server
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(), 
numServers);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 numServers + 1);
+
+    // Rebalance
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    // Should see the added server in the instance assignment
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getInstances(0,
 0).size(), numServers + 1);
+
+    // One instance per replica group, no partition
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0, 
0, false,
+            null), null, false);
+
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+    // With instance assignment config, the number of servers is reduced to 
the number of replica groups
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(), 
numServers + 1);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 NUM_REPLICAS);
+
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(),
 NUM_REPLICAS);
+
+
+    // increase replica group size by 1
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS + 1, 1, 
0, 0, false,
+            null), null, true);
+
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceServerInfo = 
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+    // with minimize data movement, we should add 1 server only

Review Comment:
   what happens for this test case if we use `setMinimizeDataMovement(false)`? 
Similarly for the case when a replica is removed? Just wondering if your test 
is able to capture the difference between the two modes or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to