jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1396731814
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- // Pick one pool for each replica-group based on the table name hash
- int pool = pools.get((tableNameHash + replicaId) % numPools);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new
ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry :
poolToInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+ Set<String> candidateInstances =
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+ for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+ String instanceName = instanceConfig.getInstanceName();
+ candidateInstances.add(instanceName);
+ instanceToPoolMap.put(instanceName, pool);
+ }
+ }
- Set<String> candidateInstances =
- poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
- List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
- instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ // Keep the same pool for the replica group if it's already been used
for the table.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+ int numCommonReplicaGroups = Math.min(numReplicaGroups,
existingNumReplicaGroups);
Review Comment:
The reason of checking the common replica groups is that it's not always the
case that numReplicaGroups gets incremented. If the number of replica groups is
reduced, we don't actually care what was used for the stale RG which is no
longer needed. That's why the common one is used here.
The scenarios you mentioned can be covered by using the min heap in Line
112, which is to gather the pool number as well as the number of times to be
chosen. The one with the least frequent usage would always be chosen to assign
to a RG. Keep in mind that 1 RG can only have 1 pool (RG -> pool), while 1 pool
may have more than 1 RG (pool -> [RG]).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]