jackjlli commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r856589037
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ Set<String> candidateInstances =
+ poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
+ List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
+ instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k ->
new HashSet<>())
+ .addAll(existingInstances);
+ }
+ }
+
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
Review Comment:
I also thought about that, but the thing is that if number of replica groups
increased, the number of instances per replica groups will be reduced. If that
case, the ones that used to belong to the original RG but didn't get picked up
in the same RG cannot be used for the new RG. I'll just leave it as it is right
now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]