This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 351e4641cc Fix a bug for partition enabled instance assignment with 
minimize movement (#14726)
351e4641cc is described below

commit 351e4641cc21a053e54ab67cfce65f57e562ce6d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Dec 30 18:29:11 2024 -0800

    Fix a bug for partition enabled instance assignment with minimize movement 
(#14726)
---
 .../InstanceReplicaGroupPartitionSelector.java     |   3 +-
 .../instance/InstanceAssignmentTest.java           | 279 ++++++++++++++++-----
 .../SegmentsValidationAndRetentionConfig.java      |  17 ++
 .../spi/utils/builder/TableConfigBuilder.java      |   1 +
 4 files changed, 243 insertions(+), 57 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 8da6dbe2f6..b8c19ede69 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -411,7 +411,8 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
       for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
         List<String> instancesInReplicaGroup = 
replicaGroupIdToInstancesMap.get(replicaGroupId);
         if (replicaGroupId < existingNumReplicaGroups) {
-          int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup + 
numPartitions - 1) / numPartitions;
+          int maxNumPartitionsPerInstance =
+              (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
           Map<String, Integer> instanceToNumPartitionsMap =
               Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
           for (String instance : instancesInReplicaGroup) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 113d4e1649..39aef7f35a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.apache.helix.model.InstanceConfig;
@@ -115,15 +116,15 @@ public class InstanceAssignmentTest {
     // Instance of index 7 is not assigned because of the hash-based rotation
     // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
     // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
-    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    //  r0  r1  r2  r0  r1  r2  r0  r1  r2
     // r0: [i8, i1, i4]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     // r1: [i9, i2, i5]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     // r2: [i0, i3, i6]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1));
@@ -137,31 +138,52 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX 
+ 0));
+  }
 
-    // ===== Test against the cases when the existing instancePartitions isn't 
null,
-    //       and minimizeDataMovement is set to true. =====
-    // Put the existing instancePartitions as the parameter to the 
InstanceAssignmentDriver.
-    // The returned instance partition should be the same as the last computed 
one.
-    tableConfig.getValidationConfig().setMinimizeDataMovement(true);
+  @Test
+  public void testMinimizeDataMovement() {
+    int numReplicas = 3;
+    int numPartitions = 2;
+    int numInstancesPerPartition = 2;
+    String partitionColumn = "partition";
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, true,
+            partitionColumn), null, true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();
+
+    int numInstances = 10;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
 
+    // Start without existing InstancePartitions:
     // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 3 instances, then these 3
     // instances should be assigned to 2 partitions, each with 2 instances
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
 
     // Instance of index 7 is not assigned because of the hash-based rotation
     // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
     // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
-    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    //  r0  r1  r2  r0  r1  r2  r0  r1  r2
     // r0: [i8, i1, i4]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     // r1: [i9, i2, i5]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     // r2: [i0, i3, i6]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1));
@@ -196,15 +218,15 @@ public class InstanceAssignmentTest {
     // Instance of index 7 is not assigned because of the hash-based rotation
     // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
     // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
-    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    //  r0  r1  r2  r0   r1  r2  r0  r1   r2
     // r0: [i8, i1, i4]
-    //      p0, p0, p1
+    //      p0  p0  p1
     //      p1
     // r1: [i9, i5, i10]
-    //      p0, p1, p0
+    //      p0  p1   p0
     //      p1
     // r2: [i0, i3, i11]
-    //      p0, p0, p1
+    //      p0  p0   p1
     //      p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1));
@@ -226,24 +248,28 @@ public class InstanceAssignmentTest {
       instanceConfigs.add(instanceConfig);
     }
     numInstancesPerPartition = 3;
-    tableConfig.getValidationConfig()
-        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, true,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
 
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
-    // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
+    // [i10, i11, i12, i13, i3, i4, i5, i7, i8, i9, i0, i1]
+    //   r1   r2   r0   r1  r2  r0  r1  r2  r0  r1  r2  r0
     // r0: [i8, i1, i4, i12]
-    //      p0, p0, p1, p0
-    //      p1, p1
+    //      p0  p0  p1   p0
+    //      p1  p1
     // r1: [i9, i5, i10, i13]
-    //      p0, p1, p0,  p0
-    //      p1,     p1
+    //      p0  p1   p0   p0
+    //      p1       p1
     // r2: [i0, i3, i11, i7]
-    //      p0, p0, p1,  p0
-    //      p1, p1
+    //      p0  p0   p1  p0
+    //      p1  p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1, SERVER_INSTANCE_ID_PREFIX + 12));
     assertEquals(instancePartitions.getInstances(1, 0),
@@ -251,86 +277,227 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX 
+ 10, SERVER_INSTANCE_ID_PREFIX + 13));
     assertEquals(instancePartitions.getInstances(1, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 9, SERVER_INSTANCE_ID_PREFIX + 10));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 10, SERVER_INSTANCE_ID_PREFIX + 9));
     assertEquals(instancePartitions.getInstances(0, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 7));
     assertEquals(instancePartitions.getInstances(1, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 0));
 
     // Reduce the number of instances per partition from 3 to 2.
     numInstancesPerPartition = 2;
-    tableConfig.getValidationConfig()
-        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, true,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
 
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
 
-    // The instance assignment should be the same as the one without the newly 
added instances.
+    // r0: [i8, i1, i4, i12]
+    //      p0  p0  p1   p1
+    // r1: [i9, i5, i10, i13]
+    //      p0  p1   p0   p1
+    // r2: [i0, i3, i11, i7]
+    //      p0  p0   p1  p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1));
     assertEquals(instancePartitions.getInstances(1, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 12));
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX 
+ 10));
     assertEquals(instancePartitions.getInstances(1, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 9));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 13));
     assertEquals(instancePartitions.getInstances(0, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
     assertEquals(instancePartitions.getInstances(1, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 0));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 7));
 
     // Add one more replica group (from 3 to 4).
     numReplicas = 4;
     
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, true,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
-    // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
-    // The existing replica groups remain unchanged.
-    // For the new replica group r3, the candidate instances become [i12, i13, 
i7].
-    // r3: [i12, i13, i7]
-    //       p0, p0, p1
-    //       p1
+    // [i10, i11, i12, i13, i3, i4, i5, i7, i8, i9, i0, i1]
+    //   r1   r2   r0   r1  r2  r0  r1  r2  r0  r3  r3  r3
+    // r0: [i8, i4, i12]
+    //      p0  p1   p1
+    //               p0
+    // r1: [i5, i10, i13]
+    //      p1   p0   p1
+    //                p0
+    // r2: [i3, i11, i7]
+    //      p0   p1  p1
+    //           p0
+    // r3: [i9, i0, i1]
+    //      p0  p0  p1
+    //      p1
     assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 12));
     assertEquals(instancePartitions.getInstances(1, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 12));
     assertEquals(instancePartitions.getInstances(0, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX 
+ 10));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 13, 
SERVER_INSTANCE_ID_PREFIX + 10));
     assertEquals(instancePartitions.getInstances(1, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 9));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 13));
     assertEquals(instancePartitions.getInstances(0, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 3));
     assertEquals(instancePartitions.getInstances(1, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 0));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 7));
     assertEquals(instancePartitions.getInstances(0, 3),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 13));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX 
+ 0));
     assertEquals(instancePartitions.getInstances(1, 3),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX 
+ 12));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 9));
 
     // Remove one replica group (from 4 to 3).
     numReplicas = 3;
     
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+    
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, true,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
 
-    // The output should be the same as the one before adding one replica 
group.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
+    // [i10, i11, i12, i13, i3, i4, i5, i7, i8, i9, i0, i1]
+    //   r1   r2   r0   r1  r2  r0  r1  r2  r0  r0  r1  r2
+    // r0: [i8, i4, i12, i9]
+    //      p0  p1   p0  p1
+    // r1: [i5, i10, i13, i0]
+    //      p1   p0   p0  p1
+    // r2: [i3, i11, i7, i1]
+    //      p0   p0  p1  p1
     assertEquals(instancePartitions.getInstances(0, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 1));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 12));
     assertEquals(instancePartitions.getInstances(1, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 9));
     assertEquals(instancePartitions.getInstances(0, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX 
+ 10));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 13, 
SERVER_INSTANCE_ID_PREFIX + 10));
     assertEquals(instancePartitions.getInstances(1, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 9));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 0));
     assertEquals(instancePartitions.getInstances(0, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 3));
     assertEquals(instancePartitions.getInstances(1, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 0));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 7));
+  }
+
+  @Test
+  public void testMinimizeDataMovementPoolBasedSingleInstancePartitions() {
+    int numReplicas = 2;
+    int numPartitions = 10;
+    int numInstancesPerPartition = 1;
+    String partitionColumn = "partition";
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), true, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, true,
+            partitionColumn), null, true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();
+
+    int numPools = 2;
+    int numInstances = 6;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Map.of(OFFLINE_TAG, 
Integer.toString(i % numPools)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Start without existing InstancePartitions:
+    // Instances from each pool should be assigned to 1 replica-group, each 
with 3 instances, then these 3 instances
+    // should be assigned to 10 partitions, each with 1 instance
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // [i4, i0, i2]
+    // [i5, i1, i3]
+    //  p0  p1  p2
+    //  p3  p4  p5
+    //  p6  p7  p8
+    //  p9
+    assertEquals(instancePartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(2, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(2, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(3, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(3, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(4, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(4, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(5, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(5, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(6, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(6, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(7, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(7, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(8, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(8, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(9, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(9, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+
+    // Add 2 new instances
+    // Each existing instance should keep 3 partitions unmoved, and only 1 
partition should be moved to the new instance
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Map.of(OFFLINE_TAG, 
Integer.toString(i % numPools)));
+      instanceConfigs.add(instanceConfig);
+    }
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 4 = 2
+    // [i4, i6, i0, i2]
+    // [i5, i7, i1, i3]
+    //  p0  p9  p1  p2
+    //  p3      p4  p5
+    //  p6      p7  p8
+    assertEquals(instancePartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(2, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(2, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(3, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(3, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(4, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(4, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(5, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(5, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(6, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(6, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(7, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(7, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(8, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(8, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(9, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(9, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
   }
 
   public void testMirrorServerSetBasedRandom() throws FileNotFoundException {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 0b8a403041..592a6c1960 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.config.table;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 
@@ -43,20 +44,26 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   private TimeUnit _timeType;
   @Deprecated  // Use SegmentAssignmentConfig instead
   private String _segmentAssignmentStrategy;
+  @Deprecated  // Use SegmentAssignmentConfig instead
   private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
   private CompletionConfig _completionConfig;
   private String _crypterClassName;
+  @Deprecated
   private boolean _minimizeDataMovement;
   // Possible values can be http or https. If this field is set, a Pinot 
server can download segments from peer servers
   // using the specified download scheme. Both realtime tables and offline 
tables can set this field.
   // For more usage of this field, please refer to this design doc: 
https://tinyurl.com/f63ru4sb
   private String _peerSegmentDownloadScheme;
 
+  /**
+   * @deprecated Use {@link InstanceAssignmentConfig} instead
+   */
   @Deprecated
   public String getSegmentAssignmentStrategy() {
     return _segmentAssignmentStrategy;
   }
 
+  @Deprecated
   public void setSegmentAssignmentStrategy(String segmentAssignmentStrategy) {
     _segmentAssignmentStrategy = segmentAssignmentStrategy;
   }
@@ -174,10 +181,15 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _schemaName = schemaName;
   }
 
+  /**
+   * @deprecated Use {@link InstanceAssignmentConfig} instead.
+   */
+  @Deprecated
   public ReplicaGroupStrategyConfig getReplicaGroupStrategyConfig() {
     return _replicaGroupStrategyConfig;
   }
 
+  @Deprecated
   public void setReplicaGroupStrategyConfig(ReplicaGroupStrategyConfig 
replicaGroupStrategyConfig) {
     _replicaGroupStrategyConfig = replicaGroupStrategyConfig;
   }
@@ -226,10 +238,15 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _crypterClassName = crypterClassName;
   }
 
+  /**
+   * @deprecated Use {@link InstanceAssignmentConfig} instead
+   */
+  @Deprecated
   public boolean isMinimizeDataMovement() {
     return _minimizeDataMovement;
   }
 
+  @Deprecated
   public void setMinimizeDataMovement(boolean minimizeDataMovement) {
     _minimizeDataMovement = minimizeDataMovement;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 5e9d915cfc..3b51a6052f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -78,6 +78,7 @@ public class TableConfigBuilder {
   @Deprecated
   private String _segmentAssignmentStrategy;
   private String _peerSegmentDownloadScheme;
+  @Deprecated
   private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
   private CompletionConfig _completionConfig;
   private String _crypterClassName;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to