somandal commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1442339107
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java:
##########
@@ -79,13 +83,26 @@ public boolean equals(Object obj) {
}
public static class AscendingIntPairComparator implements
Comparator<IntPair> {
+ private boolean _ascending;
+
+ public AscendingIntPairComparator(boolean ascending) {
Review Comment:
recommend renaming this class since it is no longer a strictly "Ascending"
comparator. The boolean you added allows both ascending and descending
comparisons.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -71,18 +74,114 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
int numReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups
must be positive");
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+ Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap =
new TreeMap<>();
+ Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap =
new TreeMap<>();
Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- // Pick one pool for each replica-group based on the table name hash
- int pool = pools.get((tableNameHash + replicaId) % numPools);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new
ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry :
poolToInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+ Set<String> candidateInstances =
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+ for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+ String instanceName = instanceConfig.getInstanceName();
+ candidateInstances.add(instanceName);
+ instanceToPoolMap.put(instanceName, pool);
+ }
+ }
+
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ // Collect the stats between the existing pools, existing replica
groups, and existing instances.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null) {
+
existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new
HashSet<>())
+ .add(existingInstance);
+
existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new
HashSet<>())
+ .add(replicaGroupId);
+
existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k
-> new HashSet<>())
+ .add(existingInstance);
+ }
+ }
+ }
+ }
+
+ // Use a max heap to track the number of servers used for the given
pools,
+ // so that pool with max number of existing instances will be
considered first.
+ PriorityQueue<Pairs.IntPair> maxHeap = new
PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+ for (int pool : pools) {
+ maxHeap.add(
+ new
Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k ->
new HashSet<>()).size(),
+ pool));
+ }
- Set<String> candidateInstances =
- poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
- List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
- instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ // Get the maximum number of replica groups per pool.
+ int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
Review Comment:
The comment here is confusing. Should this use the ceil() of the division?
What if the `numReplicaGroups` isn't a multiple of number of pools? e.g. 3
replica groups across 2 pools? This will set the max to 1 instead of 2.
Or is this intentionally the floor? In which case can you update the comment
and variable name to reflect that this should be minimum number of RGs/pool?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java:
##########
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends
BaseJsonConfig {
"Name of the column used for partition, if not provided table level
replica group will be used")
private final String _partitionColumn;
+ // TODO: remove this config in the next official release
+ @Deprecated
Review Comment:
just a question: we'll have to update all table configs on our end to remove
this once it is removed, right? Will we see failures for existing tables if
this is deleted in the next release but we still have table configs setting
this in the `InstanceReplicaGroupPartitionConfig`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -71,18 +74,114 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
int numReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups
must be positive");
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+ Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap =
new TreeMap<>();
+ Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap =
new TreeMap<>();
Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- // Pick one pool for each replica-group based on the table name hash
- int pool = pools.get((tableNameHash + replicaId) % numPools);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new
ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry :
poolToInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+ Set<String> candidateInstances =
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+ for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+ String instanceName = instanceConfig.getInstanceName();
+ candidateInstances.add(instanceName);
+ instanceToPoolMap.put(instanceName, pool);
+ }
+ }
+
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ // Collect the stats between the existing pools, existing replica
groups, and existing instances.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null) {
+
existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new
HashSet<>())
+ .add(existingInstance);
+
existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new
HashSet<>())
+ .add(replicaGroupId);
+
existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k
-> new HashSet<>())
+ .add(existingInstance);
+ }
+ }
+ }
+ }
+
+ // Use a max heap to track the number of servers used for the given
pools,
+ // so that pool with max number of existing instances will be
considered first.
+ PriorityQueue<Pairs.IntPair> maxHeap = new
PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+ for (int pool : pools) {
+ maxHeap.add(
+ new
Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k ->
new HashSet<>()).size(),
+ pool));
+ }
- Set<String> candidateInstances =
- poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
- List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
- instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ // Get the maximum number of replica groups per pool.
+ int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
+ // Given a pool number, assign replica group which has the max number
of existing instances.
+ // Repeat this process until the max number of replica groups per pool
is reached.
+ while (!maxHeap.isEmpty()) {
+ Pairs.IntPair pair = maxHeap.remove();
+ int poolNumber = pair.getRight();
+ for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
Review Comment:
Just wondering if there is a code simplification opportunity here. Instead
of running this outer loop, can you just extract out the relevant group ids
from `existingReplicaGroupIdToExistingInstancesMap`, sort by size ascending and
assign the top `maxNumberOfReplicaGroupPerPool` number of target groups if
larger than 0?
Also I guess if you do want to keep this for loop you can move it to be
after the following, right?
```
Set<Integer> existingReplicaGroups =
existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
if (existingReplicaGroups == null ||
existingReplicaGroups.isEmpty()) {
continue;
}
```
I don't see how the above will change for each run
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +123,46 @@ public Map<Integer, List<InstanceConfig>>
selectInstances(List<InstanceConfig> i
return poolToInstanceConfigsMap;
}
- // Select pools based on the table name hash to evenly distribute the
tables
poolsToSelect = new ArrayList<>(numPoolsToSelect);
- List<Integer> poolsInCluster = new ArrayList<>(pools);
- for (int i = 0; i < numPoolsToSelect; i++) {
- poolsToSelect.add(poolsInCluster.get((tableNameHash + i) %
numPools));
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new
TreeMap<>();
+ // Keep the same pool if it's already been used for the table.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null) {
+ if
(!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
+ existingPoolsToExistingInstancesMap.put(existingPool, new
HashSet<>());
+ }
Review Comment:
you don't need this. You're already doing a `computeIfAbsent` on the next
line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]