This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit a286bd868f434c72e32351890566d684ec14a378 Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Wed Jan 31 23:15:41 2024 -0800 Enhance algorithm --- .../instance/FDAwareInstancePartitionSelector.java | 7 +- .../instance/InstancePartitionSelector.java | 10 +- .../InstanceReplicaGroupPartitionSelector.java | 768 +++++++++++---------- .../instance/InstanceTagPoolSelector.java | 56 +- .../instance/InstanceAssignmentTest.java | 43 +- .../InstanceReplicaGroupPartitionSelectorTest.java | 141 ++-- .../java/org/apache/pinot/spi/utils/Pairs.java | 23 +- 7 files changed, 562 insertions(+), 486 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java index de96d4da4d..89d64272e3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java @@ -109,10 +109,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup); } - /** - * Selects instances based on the replica-group/partition config, and stores the result into the given instance - * partitions. - */ + @Override public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInstanceConfigsMap, InstancePartitions instancePartitions) { @@ -152,7 +149,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector * initialize the new replicaGroupBasedAssignmentState for assignment, * place existing instances in their corresponding positions */ - if (_minimizeDataMovement && _existingInstancePartitions != null) { + if (_minimizeDataMovement) { int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); int numExistingPartitions = _existingInstancePartitions.getNumPartitions(); /* diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java index 335070b003..b80ad8bba9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; @@ -32,13 +33,14 @@ abstract class InstancePartitionSelector { protected final boolean _minimizeDataMovement; public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { _replicaGroupPartitionConfig = replicaGroupPartitionConfig; _tableNameWithType = tableNameWithType; _existingInstancePartitions = existingInstancePartitions; - // For backward compatibility, enable minimize data movement when it is enabled in top level or instance - // partition selector level. - _minimizeDataMovement = minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement(); + // For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition + // selector level + _minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement()) + && existingInstancePartitions != null; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index 505006f1d3..8da6dbe2f6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -19,24 +19,22 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Triple; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; -import org.apache.pinot.spi.utils.Pairs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +51,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); } - /** - * Selects instances based on the replica-group/partition config, and stores the result into the given instance - * partitions. - */ + @Override public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, InstancePartitions instancePartitions) { int numPools = poolToInstanceConfigsMap.size(); @@ -65,393 +60,448 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele int tableNameHash = Math.abs(_tableNameWithType.hashCode()); List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet()); pools.sort(null); - LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}", - _tableNameWithType, tableNameHash, pools); + LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}, " + + "minimize data movement: {}", _tableNameWithType, tableNameHash, pools, _minimizeDataMovement); if (_replicaGroupPartitionConfig.isReplicaGroupBased()) { - // Replica-group based selection - - int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); - Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); - Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); - Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); - Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>(); - Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>(); - Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); - Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); - Map<String, Integer> instanceToPoolMap = new HashMap<>(); - for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { - Integer pool = entry.getKey(); - List<InstanceConfig> instanceConfigsInPool = entry.getValue(); - Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - for (InstanceConfig instanceConfig : instanceConfigsInPool) { - String instanceName = instanceConfig.getInstanceName(); - candidateInstances.add(instanceName); - instanceToPoolMap.put(instanceName, pool); - } + if (_minimizeDataMovement) { + replicaGroupBasedMinimumMovement(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash); + } else { + replicaGroupBasedSimple(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash); } + } else { + nonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash); + } + } - if (_minimizeDataMovement && _existingInstancePartitions != null) { - // Collect the stats between the existing pools, existing replica groups, and existing instances. - int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); - int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); - for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); - for (String existingInstance : existingInstances) { - Integer existingPool = instanceToPoolMap.get(existingInstance); - if (existingPool != null) { - existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>()) - .add(existingInstance); - existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>()) - .add(replicaGroupId); - existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) - .add(existingInstance); - } - } - } - } - - // Use a max heap to track the number of servers used for the given pools, - // so that pool with max number of existing instances will be considered first. - PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false)); - for (int pool : pools) { - maxHeap.add( - new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(), - pool)); - } + private void nonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) { + // Pick one pool based on the table name hash + int pool = pools.get(Math.abs(tableNameHash % pools.size())); + LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType); + List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool); + int numInstances = instanceConfigs.size(); - // Get the maximum number of replica groups per pool. - int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size(); - // Given a pool number, assign replica group which has the max number of existing instances. - // Repeat this process until the max number of replica groups per pool is reached. - while (!maxHeap.isEmpty()) { - Pairs.IntPair pair = maxHeap.remove(); - int poolNumber = pair.getRight(); - for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) { - Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber); - if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) { - continue; - } - int targetReplicaGroupId = -1; - int maxNumInstances = 0; - for (int existingReplicaGroupId : existingReplicaGroups) { - int numExistingInstances = - existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>()) - .size(); - if (numExistingInstances > maxNumInstances) { - maxNumInstances = numExistingInstances; - targetReplicaGroupId = existingReplicaGroupId; - } - } - // If target existing replica group cannot be found, it means it should be chosen from a new replica group. - if (targetReplicaGroupId > -1) { - poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId); - replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber); - // Clear the stats so that the same replica group won't be picked up again in later iteration. - existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear(); - } - } - } + // Assign all instances if not configured + int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); + if (numInstancesToSelect > 0) { + Preconditions.checkState(numInstancesToSelect <= numInstances, + "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstances, + numInstancesToSelect); + } else { + numInstancesToSelect = numInstances; + } - // If there is any new replica group added, choose pool which is least frequently picked up. - // Use a min heap to track the least frequently picked pool among all the pools. - PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator()); - for (int pool : pools) { - int numExistingReplicaGroups = - poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0; - minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool)); - } - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - if (replicaGroupIdToPoolMap.containsKey(replicaId)) { - continue; - } - // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection. - Pairs.IntPair pair = minHeap.remove(); - int pool = pair.getRight(); - pair.setLeft(pair.getLeft() + 1); - minHeap.add(pair); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); - } - } else { - // Current default way to assign pool to replica groups. - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - // Pick one pool for each replica-group based on the table name hash - int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); - } - } - LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, - _tableNameWithType); - - int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); - if (numInstancesPerReplicaGroup > 0) { - // Check if we have enough instances if number of instances per replica-group is configured - for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { - int pool = entry.getKey(); - int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); - int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size(); - Preconditions.checkState(numInstancesToSelect <= numInstancesInPool, - "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool, - numInstancesToSelect); - } - } else { - // Use as many instances as possible if number of instances per replica-group is not configured - numInstancesPerReplicaGroup = Integer.MAX_VALUE; - for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { - int pool = entry.getKey(); - int numReplicaGroupsInPool = entry.getValue().size(); - int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); - Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool, - "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool, - numReplicaGroupsInPool, numInstancesInPool); - numInstancesPerReplicaGroup = - Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool); - } + List<String> instancesToSelect; + if (_minimizeDataMovement) { + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + LinkedHashSet<String> candidateInstances = Sets.newLinkedHashSetWithExpectedSize(instanceConfigs.size()); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + selectInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + LOGGER.info("Selecting instances: {} for table: {}, existing instances: {}", instancesToSelect, + _tableNameWithType, existingInstances); + } else { + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); } - LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, - _tableNameWithType); + LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); + } + // Set the instances as partition 0 replica 0 + instancePartitions.setInstances(0, 0, instancesToSelect); + } + + /** + * Selects the instances with minimum movement. + * For each instance in the existing instances, if it is still alive, keep it in the same position. Then fill the + * vacant positions with the remaining candidate instances. + * NOTE: This method will modify the candidate instances. + */ + private static List<String> selectInstancesWithMinimumMovement(int numInstancesToSelect, + LinkedHashSet<String> candidateInstances, List<String> existingInstances) { + // Initialize the list with empty positions to fill + List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(null); + } - // Assign instances within a replica-group to one partition if not configured - int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); - if (numPartitions <= 0) { - numPartitions = 1; + // Keep the existing instances that are still alive + int numInstancesToCheck = Math.min(numInstancesToSelect, existingInstances.size()); + for (int i = 0; i < numInstancesToCheck; i++) { + String existingInstance = existingInstances.get(i); + if (candidateInstances.remove(existingInstance)) { + instancesToSelect.set(i, existingInstance); } - // Assign all instances within a replica-group to each partition if not configured - int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); - if (numInstancesPerPartition > 0) { - Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup, - "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:" - + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup); - } else { - numInstancesPerPartition = numInstancesPerReplicaGroup; + } + + // Fill the vacant positions with the remaining candidate instances + Iterator<String> iterator = candidateInstances.iterator(); + for (int i = 0; i < numInstancesToSelect; i++) { + if (instancesToSelect.get(i) == null) { + instancesToSelect.set(i, iterator.next()); } - LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", - numPartitions, numInstancesPerPartition, _tableNameWithType); - - if (_minimizeDataMovement && _existingInstancePartitions != null) { - // Minimize data movement. - int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); - int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); - int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); - - existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>(); - // Step 1: find out the replica groups and their existing instances, - // so that these instances can be filtered out and won't be chosen for the other replica group. - for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { - Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); - if (pool == null) { - // Skip the replica group if it's no longer needed. - continue; - } + } - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); - existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) - .addAll(existingInstances); - } - } + return instancesToSelect; + } - for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { - Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); - // Step 2: filter out instances that belong to other replica groups which should not be the candidate. - LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); - for (int otherReplicaGroupId = 0; - otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups; - otherReplicaGroupId++) { - if (replicaGroupId != otherReplicaGroupId) { - candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId)); - } - } - LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>(); - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); - // Step 3: figure out the missing instances and the new instances to fill their vacant positions. - List<String> instancesToSelect = - getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); - chosenCandidateInstances.addAll(instancesToSelect); - instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); - } - // Remove instances that are already been chosen. - poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); - } + private void replicaGroupBasedSimple(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) { + int numPools = pools.size(); + int numReplicaGroups = getNumReplicaGroups(); - // If the new number of replica groups is greater than the existing number of replica groups. - for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int pool = replicaGroupIdToPoolMap.get(replicaGroupId); - LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); - - Set<String> chosenCandidateInstances = new HashSet<>(); - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List<String> existingInstances = Collections.emptyList(); - List<String> instancesToSelect = - getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); - chosenCandidateInstances.addAll(instancesToSelect); - instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); - } - // Remove instances that are already been chosen. - poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); - } - } else { - // Pick instances based on the sorted list of instance names. - String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup]; - for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { - List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey()); - List<Integer> replicaGroupIdsInPool = entry.getValue(); - - // Use round-robin to assign instances to each replica-group so that they get instances with similar picking - // priority - // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group) - // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9] - // r0 r1 r0 r1 r0 r1 - int instanceIdInPool = 0; - for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup; - instanceIdInReplicaGroup++) { - for (int replicaGroupId : replicaGroupIdsInPool) { - replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] = - instanceConfigsInPool.get(instanceIdInPool++).getInstanceName(); - } - } + // Pick one pool for each replica-group based on the table name hash + Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + int startIndex = Math.abs(tableNameHash % numPools); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + int pool = pools.get((startIndex + replicaGroupId) % numPools); + poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaGroupId); + } + LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, + _tableNameWithType); + + int numInstancesPerReplicaGroup = + getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap); + LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, + _tableNameWithType); + int numPartitions = getNumPartitions(); + int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup); + LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", + numPartitions, numInstancesPerPartition, _tableNameWithType); + + // Pick instances based on the sorted list of instance names + String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup]; + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey()); + List<Integer> replicaGroupIdsInPool = entry.getValue(); + + // Use round-robin to assign instances to each replica-group so that they get instances with similar picking + // priority + // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group) + // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9] + // r0 r1 r0 r1 r0 r1 + int instanceIdInPool = 0; + for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup; + instanceIdInReplicaGroup++) { + for (int replicaGroupId : replicaGroupIdsInPool) { + replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] = + instanceConfigsInPool.get(instanceIdInPool++).getInstanceName(); } + } + } - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; - } - LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", - instancesInPartition, replicaGroupId, partitionId, _tableNameWithType); - instancePartitions.setInstances(partitionId, replicaGroupId, instancesInPartition); - } + // Assign consecutive instances within a replica-group to each partition + // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) + // [i0, i1, i2, i3, i4] + // p0 p0 p0 p1 p1 + // p1 p2 p2 p2 + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + String[] instancesInReplicaGroup = replicaGroupIdToInstancesMap[replicaGroupId]; + int instanceIdInReplicaGroup = 0; + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List<String> instances = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instances.add(instancesInReplicaGroup[instanceIdInReplicaGroup]); + instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; } + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", instances, + replicaGroupId, partitionId, _tableNameWithType); + instancePartitions.setInstances(partitionId, replicaGroupId, instances); } - } else { - // Non-replica-group based selection - - // Pick one pool based on the table name hash - int pool = pools.get(tableNameHash % numPools); - LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType); - List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool); - int numInstanceConfigs = instanceConfigs.size(); - - // Assign all instances if not configured - int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); - if (numInstancesToSelect > 0) { - Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs, - "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs, + } + } + + private int getNumReplicaGroups() { + int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); + return numReplicaGroups; + } + + private int getNumInstancesPerReplicaGroup(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + Map<Integer, List<Integer>> poolToReplicaGroupIdsMap) { + int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + if (numInstancesPerReplicaGroup > 0) { + // Check if we have enough instances if number of instances per replica-group is configured + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { + int pool = entry.getKey(); + int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); + int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size(); + Preconditions.checkState(numInstancesToSelect <= numInstancesInPool, + "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool, numInstancesToSelect); - } else { - numInstancesToSelect = numInstanceConfigs; } - - List<String> instancesToSelect; - if (_minimizeDataMovement && _existingInstancePartitions != null) { - // Minimize data movement. - List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); - LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(); - instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); - instancesToSelect = - getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); - } else { - // Select instances sequentially. - instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); - } + } else { + // Use as many instances as possible if number of instances per replica-group is not configured + numInstancesPerReplicaGroup = Integer.MAX_VALUE; + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { + int pool = entry.getKey(); + int numReplicaGroupsInPool = entry.getValue().size(); + int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); + Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool, + "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool, + numReplicaGroupsInPool, numInstancesInPool); + numInstancesPerReplicaGroup = + Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool); } - LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); - // Set the instances as partition 0 replica 0 - instancePartitions.setInstances(0, 0, instancesToSelect); } + return numInstancesPerReplicaGroup; } - /** - * Select instances with minimum movement. - * This algorithm can solve the following scenarios: - * * swap an instance - * * add/remove replica groups - * * increase/decrease number of instances per replica group - * TODO: handle the scenarios that selected pools are changed. - * TODO: improve the algorithm by doing the following steps: - * 1. assign the existing instances for all partitions; - * 2. assign the vacant positions based on the partitions already assigned to each instance. - * @param numInstancesToSelect number of instances to select - * @param candidateInstances candidate instances to be selected - * @param existingInstances list of existing instances - */ - private static List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, - LinkedHashSet<String> candidateInstances, List<String> existingInstances) { - // Initialize the list with empty positions to fill. - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(null); + private int getNumPartitions() { + // Assign instances within a replica-group to one partition if not configured + int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); + if (numPartitions <= 0) { + numPartitions = 1; } - Deque<String> newlyAddedInstances = new LinkedList<>(); + return numPartitions; + } + + private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) { + // Assign all instances within a replica-group to each partition if not configured + int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); + if (numInstancesPerPartition > 0) { + Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup, + "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group: %s", + numInstancesPerPartition, numInstancesPerReplicaGroup); + } else { + numInstancesPerPartition = numInstancesPerReplicaGroup; + } + return numInstancesPerPartition; + } + + private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) { + int numPools = pools.size(); + int numReplicaGroups = getNumReplicaGroups(); - // Find out the existing instances that are still alive. - Set<String> existingInstancesStillAlive = new HashSet<>(); - for (String existingInstance : existingInstances) { - if (candidateInstances.contains(existingInstance)) { - existingInstancesStillAlive.add(existingInstance); + Map<String, Integer> instanceToPoolMap = new HashMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + int pool = entry.getKey(); + for (InstanceConfig instanceConfig : entry.getValue()) { + instanceToPoolMap.put(instanceConfig.getInstanceName(), pool); } } - // Find out the newly added instances. - for (String candidateInstance : candidateInstances) { - if (!existingInstancesStillAlive.contains(candidateInstance)) { - newlyAddedInstances.add(candidateInstance); + // Calculate the mapping from pool to replica-groups assigned to the pool + List<Set<String>> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups); + Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools; + int startIndex = Math.abs(tableNameHash % numPools); + + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + // For each replica-group, gather number of existing instances within each pool + Set<String> existingInstanceSet = new HashSet<>(); + replicaGroupIdToExistingInstancesMap.add(existingInstanceSet); + Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>(); + if (replicaGroupId < existingNumReplicaGroups) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + existingInstanceSet.addAll(existingInstances); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null) { + poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum); + } + } + } + } + // Sort the pools based on the number of existing instances in the pool in descending order, then use the table + // name hash to break even + // Triple stores (pool, numExistingInstances, poolIndex) for sorting + List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools); + for (int i = 0; i < numPools; i++) { + int pool = pools.get((startIndex + replicaGroupId + i) % numPools); + triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i)); + } + triples.sort((o1, o2) -> { + int result = Integer.compare(o2.getMiddle(), o1.getMiddle()); + return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight()); + }); + for (Triple<Integer, Integer, Integer> triple : triples) { + int pool = triple.getLeft(); + List<Integer> replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()); + if (replicaGroupIds.size() < maxReplicaGroupsPerPool) { + replicaGroupIds.add(replicaGroupId); + break; + } } } + LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, + _tableNameWithType); - int numExistingInstances = existingInstances.size(); - for (int i = 0; i < numInstancesToSelect; i++) { - String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null; - String selectedInstance; - if (existingInstance != null && candidateInstances.contains(existingInstance)) { - selectedInstance = existingInstance; - existingInstancesStillAlive.remove(selectedInstance); - } else { - selectedInstance = newlyAddedInstances.poll(); + int numInstancesPerReplicaGroup = + getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap); + LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, + _tableNameWithType); + int numPartitions = getNumPartitions(); + int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup); + LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", + numPartitions, numInstancesPerPartition, _tableNameWithType); + + List<List<String>> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups); + for (int i = 0; i < numReplicaGroups; i++) { + replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup)); + } + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { + // For each pool, keep the existing instances that are still alive within each replica-group + int pool = entry.getKey(); + List<Integer> replicaGroupIds = entry.getValue(); + List<String> newInstances = new ArrayList<>(); + for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) { + String instanceName = instanceConfig.getInstanceName(); + boolean isExistingInstance = false; + for (int replicaGroupId : replicaGroupIds) { + List<String> instances = replicaGroupIdToInstancesMap.get(replicaGroupId); + if (instances.size() == numInstancesPerReplicaGroup) { + continue; + } + if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) { + instances.add(instanceName); + isExistingInstance = true; + break; + } + } + if (!isExistingInstance) { + newInstances.add(instanceName); + } } - instancesToSelect.set(i, selectedInstance); - // If it's an existing alive instance, or it's for a new replica group, add the new instance to the tail, - // so that it won't be firstly chosen for the next partition. - // For newly added instances to fill the existing replica group, the sequence cannot change; - // otherwise there is no guarantee that same vacant position will be filled with the same new instance. - // The 'selectedInstance' object can still be null if there is no new instances from the candidate list. - if (selectedInstance != null && (i < numExistingInstances || existingInstances.isEmpty())) { - candidateInstances.remove(selectedInstance); - candidateInstances.add(selectedInstance); + // Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then + // use round-robin to assign instances to each replica-group so that they get instances with similar picking + // priority. + int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size(); + for (int replicaGroupId : replicaGroupIds) { + numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size(); + } + for (int i = 0; i < numInstancesToFill; i++) { + int leastNumInstances = Integer.MAX_VALUE; + int replicaGroupIdWithLeastInstances = -1; + for (int replicaGroupId : replicaGroupIds) { + int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size(); + if (numInstances < leastNumInstances) { + leastNumInstances = numInstances; + replicaGroupIdWithLeastInstances = replicaGroupId; + } + } + replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i)); } } - // If there are still some vacant positions in the instance list, - // try to fill with instances which are either left over or newly added. - for (int i = 0; i < instancesToSelect.size(); i++) { - if (instancesToSelect.get(i) == null) { - if (!existingInstancesStillAlive.isEmpty()) { - Iterator<String> iterator = existingInstancesStillAlive.iterator(); - String existingInstanceLeftOver = iterator.next(); - instancesToSelect.set(i, existingInstanceLeftOver); - iterator.remove(); - } else if (!newlyAddedInstances.isEmpty()) { - // pick a new instance to fill its vacant position. - String newInstance = newlyAddedInstances.pollFirst(); - instancesToSelect.set(i, newInstance); + if (numPartitions == 1) { + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId); + if (replicaGroupId < existingNumReplicaGroups) { + List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId); + LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup); + List<String> instances = + selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances); + LOGGER.info( + "Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}", + instances, replicaGroupId, _tableNameWithType, existingInstances); + instancePartitions.setInstances(0, replicaGroupId, instances); + } else { + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, " + + "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType); + instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup); + } + } + } else { + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId); + if (replicaGroupId < existingNumReplicaGroups) { + int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup + numPartitions - 1) / numPartitions; + Map<String, Integer> instanceToNumPartitionsMap = + Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup); + for (String instance : instancesInReplicaGroup) { + instanceToNumPartitionsMap.put(instance, 0); + } + + List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions); + List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions); + List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + // Initialize the list with empty positions to fill + List<String> instances = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instances.add(null); + } + partitionIdToInstancesMap.add(instances); + Set<String> instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition); + partitionIdToInstanceSetMap.add(instanceSet); + + // Keep the existing instances that are still alive + if (partitionId < existingNumPartitions) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + partitionIdToExistingInstancesMap.add(existingInstances); + int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size()); + for (int i = 0; i < numInstancesToCheck; i++) { + String existingInstance = existingInstances.get(i); + Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance); + if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) { + instances.set(i, existingInstance); + instanceSet.add(existingInstance); + instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1); + } + } + } + } + + // Fill the vacant positions with instance that serves the least partitions + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List<String> instances = partitionIdToInstancesMap.get(partitionId); + Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId); + int numInstancesToFill = numInstancesPerPartition - instanceSet.size(); + if (numInstancesToFill > 0) { + // Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting + List<Triple<String, Integer, Integer>> triples = new ArrayList<>(numInstancesPerReplicaGroup); + for (int i = 0; i < numInstancesPerReplicaGroup; i++) { + String instance = instancesInReplicaGroup.get(i); + if (!instanceSet.contains(instance)) { + triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i)); + } + } + triples.sort((o1, o2) -> { + int result = Integer.compare(o1.getMiddle(), o2.getMiddle()); + return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight()); + }); + int instanceIdToFill = 0; + for (int i = 0; i < numInstancesPerPartition; i++) { + if (instances.get(i) == null) { + String instance = triples.get(instanceIdToFill++).getLeft(); + instances.set(i, instance); + instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1); + } + } + } + + if (partitionId < existingNumPartitions) { + LOGGER.info( + "Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}", + instances, replicaGroupId, partitionId, _tableNameWithType, + partitionIdToExistingInstancesMap.get(partitionId)); + } else { + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, " + + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType); + } + instancePartitions.setInstances(partitionId, replicaGroupId, instances); + } + } else { + // Assign consecutive instances within a replica-group to each partition + int instanceIdInReplicaGroup = 0; + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List<String> instances = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup)); + instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + } + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, " + + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType); + instancePartitions.setInstances(partitionId, replicaGroupId, instances); + } } } } - return instancesToSelect; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java index 940968432b..28d58bbbcd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java @@ -22,18 +22,17 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; -import org.apache.pinot.spi.utils.Pairs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,16 +45,14 @@ public class InstanceTagPoolSelector { private final InstanceTagPoolConfig _tagPoolConfig; private final String _tableNameWithType; - private final boolean _minimizeDataMovement; - private final InstancePartitions _existingInstancePartitions; public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType, boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) { _tagPoolConfig = tagPoolConfig; _tableNameWithType = tableNameWithType; - _minimizeDataMovement = minimizeDataMovement; + _minimizeDataMovement = minimizeDataMovement && existingInstancePartitions != null; _existingInstancePartitions = existingInstancePartitions; } @@ -104,7 +101,7 @@ public class InstanceTagPoolSelector { // Calculate the pools to select based on the selection config Set<Integer> pools = poolToInstanceConfigsMap.keySet(); List<Integer> poolsToSelect = _tagPoolConfig.getPools(); - if (poolsToSelect != null && !poolsToSelect.isEmpty()) { + if (!CollectionUtils.isEmpty(poolsToSelect)) { Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s", poolsToSelect); } else { @@ -123,45 +120,44 @@ public class InstanceTagPoolSelector { return poolToInstanceConfigsMap; } + // Select pools based on the table name hash to evenly distribute the tables + List<Integer> poolsInCluster = new ArrayList<>(pools); + int startIndex = Math.abs(tableNameHash % numPools); poolsToSelect = new ArrayList<>(numPoolsToSelect); - if (_minimizeDataMovement && _existingInstancePartitions != null) { - Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); - // Keep the same pool if it's already been used for the table. + if (_minimizeDataMovement) { + assert _existingInstancePartitions != null; + Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>(); int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); - for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); for (String existingInstance : existingInstances) { Integer existingPool = instanceToPoolMap.get(existingInstance); if (existingPool != null) { - if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) { - existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>()); - } - existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>()) - .add(existingInstance); + poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum); } } } } - - // Use a max heap to track the number of servers used for all the pools. - PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false)); - for (int pool : pools) { - maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool)); + // Sort the pools based on the number of existing instances in the pool in descending order, then use the + // table name hash to break even + // Triple stores (pool, numExistingInstances, poolIndex) for sorting + List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools); + for (int i = 0; i < numPools; i++) { + int pool = poolsInCluster.get((startIndex + i) % numPools); + triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i)); } - - // Pick the pools from the max heap, so that data movement be minimized. + triples.sort((o1, o2) -> { + int result = Integer.compare(o2.getMiddle(), o1.getMiddle()); + return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight()); + }); for (int i = 0; i < numPoolsToSelect; i++) { - Pairs.IntPair pair = maxHeap.remove(); - poolsToSelect.add(pair.getRight()); + poolsToSelect.add(triples.get(i).getLeft()); } - LOGGER.info("The selected pools: " + poolsToSelect); } else { - // Select pools based on the table name hash to evenly distribute the tables - List<Integer> poolsInCluster = new ArrayList<>(pools); for (int i = 0; i < numPoolsToSelect; i++) { - poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools)); + poolsToSelect.add(poolsInCluster.get((startIndex + i) % numPools)); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index a6220c00a2..113d4e1649 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -50,7 +50,9 @@ import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; public class InstanceAssignmentTest { @@ -198,8 +200,8 @@ public class InstanceAssignmentTest { // r0: [i8, i1, i4] // p0, p0, p1 // p1 - // r1: [i9, i10, i5] - // p0, p0, p1 + // r1: [i9, i5, i10] + // p0, p1, p0 // p1 // r2: [i0, i3, i11] // p0, p0, p1 @@ -217,7 +219,7 @@ public class InstanceAssignmentTest { assertEquals(instancePartitions.getInstances(1, 2), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0)); - // Add 2 more instances to the ZK and increase the number of instances per replica group from 2 to 3. + // Add 2 more instances to the ZK and increase the number of instances per partition from 2 to 3. for (int i = numInstances + 2; i < numInstances + 4; i++) { InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); instanceConfig.addTag(OFFLINE_TAG); @@ -233,34 +235,29 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2 // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1] - // For r0, the candidate instances are [i12, i13, i4, i7, i8, i1]. - // For p0, since the existing assignment is [i8, i1], the next available instance from the candidates is i12. - // For p1, the existing assignment is [i4, i8], the next available instance is also i12. - // r0: [i12, i4, i8, i1] - // For r1, the candidate instances become [i10, i13, i5, i7, i9]. - // For p0, since the existing assignment is [i9, i10], the next available instance is i13 (new instance). - // For p1, the existing assignment is [i5, i9], the next available one from the candidates is i10, but since - // i10 is already used in the former partition, it got added to the tail, so the next available one is i13. - // r1: [i10, i13, i5, i9] - // For r2, the candidate instances become [i11, i3, i7, i0]. - // For p0, the existing assignment is [i0, i3], the next available instance from the candidates is i11. - // For p1, the existing assignment is [i11, i0], the next available instance from the candidates is i3, but - // since i3 is already used in the former partition, it got appended to the tail, so the next available one is i7. - // r2: [i11, i3, i7, i0] + // r0: [i8, i1, i4, i12] + // p0, p0, p1, p0 + // p1, p1 + // r1: [i9, i5, i10, i13] + // p0, p1, p0, p0 + // p1, p1 + // r2: [i0, i3, i11, i7] + // p0, p0, p1, p0 + // p1, p1 assertEquals(instancePartitions.getInstances(0, 0), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12)); assertEquals(instancePartitions.getInstances(1, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 12)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1)); assertEquals(instancePartitions.getInstances(0, 1), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13)); assertEquals(instancePartitions.getInstances(1, 1), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 13)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10)); assertEquals(instancePartitions.getInstances(0, 2), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 11)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 7)); assertEquals(instancePartitions.getInstances(1, 2), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 7)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); - // Reduce the number of instances per replica group from 3 to 2. + // Reduce the number of instances per partition from 3 to 2. numInstancesPerPartition = 2; tableConfig.getValidationConfig() .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java index fdb6292f26..288b789aee 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java @@ -31,36 +31,66 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; + public class InstanceReplicaGroupPartitionSelectorTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + //@formatter:off private static final String INSTANCE_CONFIG_TEMPLATE = - "{\n" + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"simpleFields\": {\n" + " \"HELIX_ENABLED\": \"true\",\n" - + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n" - + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n" - + " \"HELIX_PORT\": \"8098\",\n" + " \"adminPort\": \"8097\",\n" + " \"grpcPort\": \"8090\",\n" - + " \"queryMailboxPort\": \"46347\",\n" + " \"queryServerPort\": \"45031\",\n" - + " \"shutdownInProgress\": \"false\"\n" + " },\n" + " \"mapFields\": {\n" - + " \"SYSTEM_RESOURCE_INFO\": {\n" + " \"numCores\": \"16\",\n" - + " \"totalMemoryMB\": \"126976\",\n" + " \"maxHeapSizeMB\": \"65536\"\n" + " },\n" - + " \"pool\": {\n" + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n" - + " \"${poolName}\": \"${pool}\",\n" + " \"AllReplicationGroups\": \"1\"\n" + " }\n" + " },\n" - + " \"listFields\": {\n" + " \"TAG_LIST\": [\n" + " \"DefaultTenant_OFFLINE\",\n" - + " \"DefaultTenant_REALTIME\",\n" + " \"${poolName}\",\n" + " \"AllReplicationGroups\"\n" - + " ]\n" + " }\n" + "}"; + "{\n" + + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"simpleFields\": {\n" + + " \"HELIX_ENABLED\": \"true\",\n" + + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n" + + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n" + + " \"HELIX_PORT\": \"8098\",\n" + + " \"adminPort\": \"8097\",\n" + + " \"grpcPort\": \"8090\",\n" + + " \"queryMailboxPort\": \"46347\",\n" + + " \"queryServerPort\": \"45031\",\n" + + " \"shutdownInProgress\": \"false\"\n" + + " },\n" + + " \"mapFields\": {\n" + + " \"SYSTEM_RESOURCE_INFO\": {\n" + + " \"numCores\": \"16\",\n" + + " \"totalMemoryMB\": \"126976\",\n" + + " \"maxHeapSizeMB\": \"65536\"\n" + + " },\n" + + " \"pool\": {\n" + + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n" + + " \"${poolName}\": \"${pool}\",\n" + + " \"AllReplicationGroups\": \"1\"\n" + + " }\n" + + " },\n" + + " \"listFields\": {\n" + + " \"TAG_LIST\": [\n" + + " \"DefaultTenant_OFFLINE\",\n" + + " \"DefaultTenant_REALTIME\",\n" + + " \"${poolName}\",\n" + + " \"AllReplicationGroups\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on @Test public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded() throws JsonProcessingException { + //@formatter:off String existingPartitionsJson = - " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" - + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" - + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ]\n" + " }\n" + " }\n"; + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class); InstanceReplicaGroupPartitionConfig config = new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); @@ -94,33 +124,47 @@ public class InstanceReplicaGroupPartitionSelectorTest { // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated, // and the instances from Pool 1 are assigned to this new replica. + //@formatter:off String expectedInstancePartitions = - " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" - + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" - + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ],\n" + " \"0_1\": [\n" - + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ]\n" + " }\n" + " }\n"; + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + + " \"0_1\": [\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on InstancePartitions expectedPartitions = OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class); - assert assignedPartitions.equals(expectedPartitions); + assertEquals(assignedPartitions, expectedPartitions); } @Test public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools() throws JsonProcessingException { // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0. + //@formatter:off String existingPartitionsJson = - " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" - + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" - + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ],\n" + " \"0_1\": [\n" - + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ]\n" + " }\n" + " }\n"; + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + + " \"0_1\": [\n" + + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class); InstanceReplicaGroupPartitionConfig config = new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); @@ -150,17 +194,24 @@ public class InstanceReplicaGroupPartitionSelectorTest { // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1. // And "rg1-0" remains the same position as it's always under Pool 1. + //@formatter:off String expectedInstancePartitions = - " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" - + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" - + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ],\n" + " \"0_1\": [\n" - + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ]\n" + " }\n" + " }\n"; + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + + " \"0_1\": [\n" + + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on InstancePartitions expectedPartitions = OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class); - assert assignedPartitions.equals(expectedPartitions); + assertEquals(assignedPartitions, expectedPartitions); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java index 45645387af..be18d35e50 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java @@ -30,11 +30,7 @@ public class Pairs { } public static Comparator<IntPair> intPairComparator() { - return new AscendingIntPairComparator(true); - } - - public static Comparator<IntPair> intPairComparator(boolean ascending) { - return new AscendingIntPairComparator(ascending); + return new AscendingIntPairComparator(); } public static class IntPair { @@ -83,26 +79,13 @@ public class Pairs { } public static class AscendingIntPairComparator implements Comparator<IntPair> { - private boolean _ascending; - - public AscendingIntPairComparator(boolean ascending) { - _ascending = ascending; - } @Override public int compare(IntPair pair1, IntPair pair2) { if (pair1._left != pair2._left) { - if (_ascending) { - return Integer.compare(pair1._left, pair2._left); - } else { - return Integer.compare(pair2._left, pair1._left); - } + return Integer.compare(pair1._left, pair2._left); } else { - if (_ascending) { - return Integer.compare(pair1._right, pair2._right); - } else { - return Integer.compare(pair2._right, pair1._right); - } + return Integer.compare(pair1._right, pair2._right); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
