This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 351e4641cc Fix a bug for partition enabled instance assignment with
minimize movement (#14726)
351e4641cc is described below
commit 351e4641cc21a053e54ab67cfce65f57e562ce6d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Dec 30 18:29:11 2024 -0800
Fix a bug for partition enabled instance assignment with minimize movement
(#14726)
---
.../InstanceReplicaGroupPartitionSelector.java | 3 +-
.../instance/InstanceAssignmentTest.java | 279 ++++++++++++++++-----
.../SegmentsValidationAndRetentionConfig.java | 17 ++
.../spi/utils/builder/TableConfigBuilder.java | 1 +
4 files changed, 243 insertions(+), 57 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 8da6dbe2f6..b8c19ede69 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -411,7 +411,8 @@ public class InstanceReplicaGroupPartitionSelector extends
InstancePartitionSele
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
List<String> instancesInReplicaGroup =
replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
- int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup +
numPartitions - 1) / numPartitions;
+ int maxNumPartitionsPerInstance =
+ (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
Map<String, Integer> instanceToNumPartitionsMap =
Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
for (String instance : instancesInReplicaGroup) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 113d4e1649..39aef7f35a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.helix.model.InstanceConfig;
@@ -115,15 +116,15 @@ public class InstanceAssignmentTest {
// Instance of index 7 is not assigned because of the hash-based rotation
// Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
// [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
- // r0, r1, r2, r0, r1, r2, r0, r1, r2
+ // r0 r1 r2 r0 r1 r2 r0 r1 r2
// r0: [i8, i1, i4]
- // p0, p0, p1
+ // p0 p0 p1
// p1
// r1: [i9, i2, i5]
- // p0, p0, p1
+ // p0 p0 p1
// p1
// r2: [i0, i3, i6]
- // p0, p0, p1
+ // p0 p0 p1
// p1
assertEquals(instancePartitions.getInstances(0, 0),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1));
@@ -137,31 +138,52 @@ public class InstanceAssignmentTest {
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3));
assertEquals(instancePartitions.getInstances(1, 2),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX
+ 0));
+ }
- // ===== Test against the cases when the existing instancePartitions isn't
null,
- // and minimizeDataMovement is set to true. =====
- // Put the existing instancePartitions as the parameter to the
InstanceAssignmentDriver.
- // The returned instance partition should be the same as the last computed
one.
- tableConfig.getValidationConfig().setMinimizeDataMovement(true);
+ @Test
+ public void testMinimizeDataMovement() {
+ int numReplicas = 3;
+ int numPartitions = 2;
+ int numInstancesPerPartition = 2;
+ String partitionColumn = "partition";
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, true,
+ partitionColumn), null, true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
+
+ int numInstances = 10;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+ // Start without existing InstancePartitions:
// Instances should be assigned to 3 replica-groups with a round-robin
fashion, each with 3 instances, then these 3
// instances should be assigned to 2 partitions, each with 2 instances
- instancePartitions =
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
instancePartitions);
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), numPartitions);
// Instance of index 7 is not assigned because of the hash-based rotation
// Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
// [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
- // r0, r1, r2, r0, r1, r2, r0, r1, r2
+ // r0 r1 r2 r0 r1 r2 r0 r1 r2
// r0: [i8, i1, i4]
- // p0, p0, p1
+ // p0 p0 p1
// p1
// r1: [i9, i2, i5]
- // p0, p0, p1
+ // p0 p0 p1
// p1
// r2: [i0, i3, i6]
- // p0, p0, p1
+ // p0 p0 p1
// p1
assertEquals(instancePartitions.getInstances(0, 0),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1));
@@ -196,15 +218,15 @@ public class InstanceAssignmentTest {
// Instance of index 7 is not assigned because of the hash-based rotation
// Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
// [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
- // r0, r1, r2, r0, r1, r2, r0, r1, r2
+ // r0 r1 r2 r0 r1 r2 r0 r1 r2
// r0: [i8, i1, i4]
- // p0, p0, p1
+ // p0 p0 p1
// p1
// r1: [i9, i5, i10]
- // p0, p1, p0
+ // p0 p1 p0
// p1
// r2: [i0, i3, i11]
- // p0, p0, p1
+ // p0 p0 p1
// p1
assertEquals(instancePartitions.getInstances(0, 0),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1));
@@ -226,24 +248,28 @@ public class InstanceAssignmentTest {
instanceConfigs.add(instanceConfig);
}
numInstancesPerPartition = 3;
- tableConfig.getValidationConfig()
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, true,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
instancePartitions =
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
instancePartitions);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), numPartitions);
// Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
- // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
+ // [i10, i11, i12, i13, i3, i4, i5, i7, i8, i9, i0, i1]
+ // r1 r2 r0 r1 r2 r0 r1 r2 r0 r1 r2 r0
// r0: [i8, i1, i4, i12]
- // p0, p0, p1, p0
- // p1, p1
+ // p0 p0 p1 p0
+ // p1 p1
// r1: [i9, i5, i10, i13]
- // p0, p1, p0, p0
- // p1, p1
+ // p0 p1 p0 p0
+ // p1 p1
// r2: [i0, i3, i11, i7]
- // p0, p0, p1, p0
- // p1, p1
+ // p0 p0 p1 p0
+ // p1 p1
assertEquals(instancePartitions.getInstances(0, 0),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1, SERVER_INSTANCE_ID_PREFIX + 12));
assertEquals(instancePartitions.getInstances(1, 0),
@@ -251,86 +277,227 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(0, 1),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX
+ 10, SERVER_INSTANCE_ID_PREFIX + 13));
assertEquals(instancePartitions.getInstances(1, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 9, SERVER_INSTANCE_ID_PREFIX + 10));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 10, SERVER_INSTANCE_ID_PREFIX + 9));
assertEquals(instancePartitions.getInstances(0, 2),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3, SERVER_INSTANCE_ID_PREFIX + 7));
assertEquals(instancePartitions.getInstances(1, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 0));
// Reduce the number of instances per partition from 3 to 2.
numInstancesPerPartition = 2;
- tableConfig.getValidationConfig()
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, true,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
instancePartitions =
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
instancePartitions);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), numPartitions);
- // The instance assignment should be the same as the one without the newly
added instances.
+ // r0: [i8, i1, i4, i12]
+ // p0 p0 p1 p1
+ // r1: [i9, i5, i10, i13]
+ // p0 p1 p0 p1
+ // r2: [i0, i3, i11, i7]
+ // p0 p0 p1 p1
assertEquals(instancePartitions.getInstances(0, 0),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1));
assertEquals(instancePartitions.getInstances(1, 0),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX
+ 8));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX
+ 12));
assertEquals(instancePartitions.getInstances(0, 1),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX
+ 10));
assertEquals(instancePartitions.getInstances(1, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 9));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 13));
assertEquals(instancePartitions.getInstances(0, 2),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3));
assertEquals(instancePartitions.getInstances(1, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 0));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 7));
// Add one more replica group (from 3 to 4).
numReplicas = 4;
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, true,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
instancePartitions =
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
instancePartitions);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), numPartitions);
// Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
- // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
- // The existing replica groups remain unchanged.
- // For the new replica group r3, the candidate instances become [i12, i13,
i7].
- // r3: [i12, i13, i7]
- // p0, p0, p1
- // p1
+ // [i10, i11, i12, i13, i3, i4, i5, i7, i8, i9, i0, i1]
+ // r1 r2 r0 r1 r2 r0 r1 r2 r0 r3 r3 r3
+ // r0: [i8, i4, i12]
+ // p0 p1 p1
+ // p0
+ // r1: [i5, i10, i13]
+ // p1 p0 p1
+ // p0
+ // r2: [i3, i11, i7]
+ // p0 p1 p1
+ // p0
+ // r3: [i9, i0, i1]
+ // p0 p0 p1
+ // p1
assertEquals(instancePartitions.getInstances(0, 0),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 12));
assertEquals(instancePartitions.getInstances(1, 0),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX
+ 8));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX
+ 12));
assertEquals(instancePartitions.getInstances(0, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX
+ 10));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 13,
SERVER_INSTANCE_ID_PREFIX + 10));
assertEquals(instancePartitions.getInstances(1, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 9));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 13));
assertEquals(instancePartitions.getInstances(0, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 3));
assertEquals(instancePartitions.getInstances(1, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 0));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 7));
assertEquals(instancePartitions.getInstances(0, 3),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12,
SERVER_INSTANCE_ID_PREFIX + 13));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX
+ 0));
assertEquals(instancePartitions.getInstances(1, 3),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX
+ 12));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 9));
// Remove one replica group (from 4 to 3).
numReplicas = 3;
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, true,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
instancePartitions =
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
instancePartitions);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), numPartitions);
- // The output should be the same as the one before adding one replica
group.
+ // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
+ // [i10, i11, i12, i13, i3, i4, i5, i7, i8, i9, i0, i1]
+ // r1 r2 r0 r1 r2 r0 r1 r2 r0 r0 r1 r2
+ // r0: [i8, i4, i12, i9]
+ // p0 p1 p0 p1
+ // r1: [i5, i10, i13, i0]
+ // p1 p0 p0 p1
+ // r2: [i3, i11, i7, i1]
+ // p0 p0 p1 p1
assertEquals(instancePartitions.getInstances(0, 0),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 1));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX
+ 12));
assertEquals(instancePartitions.getInstances(1, 0),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX
+ 8));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX
+ 9));
assertEquals(instancePartitions.getInstances(0, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX
+ 10));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 13,
SERVER_INSTANCE_ID_PREFIX + 10));
assertEquals(instancePartitions.getInstances(1, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 9));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX
+ 0));
assertEquals(instancePartitions.getInstances(0, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 3));
assertEquals(instancePartitions.getInstances(1, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 0));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 7));
+ }
+
+ @Test
+ public void testMinimizeDataMovementPoolBasedSingleInstancePartitions() {
+ int numReplicas = 2;
+ int numPartitions = 10;
+ int numInstancesPerPartition = 1;
+ String partitionColumn = "partition";
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), true,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, true,
+ partitionColumn), null, true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
+
+ int numPools = 2;
+ int numInstances = 6;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Map.of(OFFLINE_TAG,
Integer.toString(i % numPools)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Start without existing InstancePartitions:
+ // Instances from each pool should be assigned to 1 replica-group, each
with 3 instances, then these 3 instances
+ // should be assigned to 10 partitions, each with 1 instance
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // [i4, i0, i2]
+ // [i5, i1, i3]
+ // p0 p1 p2
+ // p3 p4 p5
+ // p6 p7 p8
+ // p9
+ assertEquals(instancePartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(2, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(2, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(3, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(3, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(4, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(4, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(5, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(5, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(6, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(6, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(7, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(7, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(8, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(8, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(9, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(9, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+
+ // Add 2 new instances
+ // Each existing instance should keep 3 partitions unmoved, and only 1
partition should be moved to the new instance
+ for (int i = numInstances; i < numInstances + 2; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Map.of(OFFLINE_TAG,
Integer.toString(i % numPools)));
+ instanceConfigs.add(instanceConfig);
+ }
+ instancePartitions =
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
instancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Math.abs("myTable_OFFLINE".hashCode()) % 4 = 2
+ // [i4, i6, i0, i2]
+ // [i5, i7, i1, i3]
+ // p0 p9 p1 p2
+ // p3 p4 p5
+ // p6 p7 p8
+ assertEquals(instancePartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(2, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(2, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(3, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(3, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(4, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(4, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(5, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(5, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(6, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(6, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(7, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(7, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(8, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(8, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(9, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+ assertEquals(instancePartitions.getInstances(9, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
}
public void testMirrorServerSetBasedRandom() throws FileNotFoundException {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 0b8a403041..592a6c1960 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.config.table;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -43,20 +44,26 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
private TimeUnit _timeType;
@Deprecated // Use SegmentAssignmentConfig instead
private String _segmentAssignmentStrategy;
+ @Deprecated // Use SegmentAssignmentConfig instead
private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
private CompletionConfig _completionConfig;
private String _crypterClassName;
+ @Deprecated
private boolean _minimizeDataMovement;
// Possible values can be http or https. If this field is set, a Pinot
server can download segments from peer servers
// using the specified download scheme. Both realtime tables and offline
tables can set this field.
// For more usage of this field, please refer to this design doc:
https://tinyurl.com/f63ru4sb
private String _peerSegmentDownloadScheme;
+ /**
+ * @deprecated Use {@link InstanceAssignmentConfig} instead
+ */
@Deprecated
public String getSegmentAssignmentStrategy() {
return _segmentAssignmentStrategy;
}
+ @Deprecated
public void setSegmentAssignmentStrategy(String segmentAssignmentStrategy) {
_segmentAssignmentStrategy = segmentAssignmentStrategy;
}
@@ -174,10 +181,15 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_schemaName = schemaName;
}
+ /**
+ * @deprecated Use {@link InstanceAssignmentConfig} instead.
+ */
+ @Deprecated
public ReplicaGroupStrategyConfig getReplicaGroupStrategyConfig() {
return _replicaGroupStrategyConfig;
}
+ @Deprecated
public void setReplicaGroupStrategyConfig(ReplicaGroupStrategyConfig
replicaGroupStrategyConfig) {
_replicaGroupStrategyConfig = replicaGroupStrategyConfig;
}
@@ -226,10 +238,15 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_crypterClassName = crypterClassName;
}
+ /**
+ * @deprecated Use {@link InstanceAssignmentConfig} instead
+ */
+ @Deprecated
public boolean isMinimizeDataMovement() {
return _minimizeDataMovement;
}
+ @Deprecated
public void setMinimizeDataMovement(boolean minimizeDataMovement) {
_minimizeDataMovement = minimizeDataMovement;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 5e9d915cfc..3b51a6052f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -78,6 +78,7 @@ public class TableConfigBuilder {
@Deprecated
private String _segmentAssignmentStrategy;
private String _peerSegmentDownloadScheme;
+ @Deprecated
private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
private CompletionConfig _completionConfig;
private String _crypterClassName;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]