jackjlli commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r856840900
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,104 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
numInstancesToSelect = numInstanceConfigs;
}
- List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ List<String> instancesToSelect;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(0, 0);
+ LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
+ instanceConfigs.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesToSelect,
candidateInstances, existingInstances);
+ } else {
+ // Select instances sequentially.
+ instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ }
}
instancesToSelect.sort(null);
LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect,
_tableNameWithType);
// Set the instances as partition 0 replica 0
instancePartitions.setInstances(0, 0, instancesToSelect);
}
}
+
+ /**
+ * Select instances with minimum movement.
+ * This algorithm can solve the following scenarios:
+ * * swap an instance
+ * * add/remove replica groups
+ * * increase/decrease number of instances per replica group
+ * TODO: handle the scenarios that selected pools are changed.
+ * TODO: improve the algorithm by doing the following steps:
+ * 1. assign the existing instances for all partitions;
+ * 2. assign the vacant positions based on the partitions already
assigned to each instance.
+ * @param numInstancesToSelect number of instances to select
+ * @param candidateInstances candidate instances to be selected
+ * @param existingInstances list of existing instances
+ */
+ private static List<String> getInstancesWithMinimumMovement(int
numInstancesToSelect,
+ LinkedHashSet<String> candidateInstances, List<String>
existingInstances) {
+ // Initialize the list with empty positions to fill.
+ List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(null);
+ }
+ Deque<String> newlyAddedInstances = new LinkedList<>();
+
+ // Find out the existing instances that are still alive.
+ Set<String> existingInstancesStillAlive = new HashSet<>();
+ for (String existingInstance : existingInstances) {
+ if (candidateInstances.contains(existingInstance)) {
+ existingInstancesStillAlive.add(existingInstance);
+ }
+ }
+
+ // Find out the newly added instances.
+ for (String candidateInstance : candidateInstances) {
+ if (!existingInstancesStillAlive.contains(candidateInstance)) {
+ newlyAddedInstances.add(candidateInstance);
+ }
+ }
+
+ int numExistingInstances = existingInstances.size();
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ String existingInstance = i < numExistingInstances ?
existingInstances.get(i) : null;
+ String selectedInstance;
+ if (existingInstance != null &&
candidateInstances.contains(existingInstance)) {
+ selectedInstance = existingInstance;
+ existingInstancesStillAlive.remove(selectedInstance);
+ } else {
+ selectedInstance = newlyAddedInstances.poll();
+ }
+ instancesToSelect.set(i, selectedInstance);
+ // If it's an existing alive instance, or it's for a new replica group,
add the new instance to the tail,
+ // so that it won't be firstly chosen for the next partition.
+ // For newly added instances to fill the existing replica group, the
sequence cannot change;
+ // otherwise there is no guarantee that same vacant position will be
filled with the same new instance.
+ // The 'selectedInstance' object can still be null if there is no new
instances from the candidate list.
+ if (selectedInstance != null && (i < numExistingInstances ||
existingInstances.isEmpty())) {
+ candidateInstances.remove(selectedInstance);
+ candidateInstances.add(selectedInstance);
+ }
+ }
+
+ // If there are still some vacant positions in the instance list,
+ // try to fill with instances which are either left over or newly added.
+ for (int i = 0; i < instancesToSelect.size(); i++) {
+ if (instancesToSelect.get(i) == null) {
+ if (!existingInstancesStillAlive.isEmpty()) {
+ Iterator<String> iterator = existingInstancesStillAlive.iterator();
+ String existingInstanceLeftOver = iterator.next();
+ instancesToSelect.set(i, existingInstanceLeftOver);
+ iterator.remove();
+ } else if (!newlyAddedInstances.isEmpty()) {
+ // pick a new instance to fill its vacant position.
+ String newInstance = newlyAddedInstances.pollFirst();
+ instancesToSelect.set(i, newInstance);
+ }
+ }
+ }
Review Comment:
This is still needed for the scenarios when some of the instances got
removed, then the instances from the tail will fill the vacant positions.
E.g. supposed the existing instance assignment is [i1, i2, i3, i4, i5], and
if i3 is removed/decommissioned, the very last instance (i.e. i5) should fill
i3's vacant position.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]