jtao15 commented on a change in pull request #8441:
URL: https://github.com/apache/pinot/pull/8441#discussion_r840958651
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -48,52 +52,121 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig
tagPoolConfig, String table
/**
* Returns a map from pool to instance configs based on the tag and pool
config for the given instance configs.
+ * @param instanceConfigs list of latest instance configs from ZK.
+ * @param existingPoolToInstancesMap existing instance with sequence that
should be respected. An empty list
+ * means no preceding sequence to
respect and the instances would be sorted.
*/
- public Map<Integer, List<InstanceConfig>>
selectInstances(List<InstanceConfig> instanceConfigs) {
+ public Map<Integer, List<InstanceConfig>>
selectInstances(List<InstanceConfig> instanceConfigs,
+ Map<Integer, List<String>> existingPoolToInstancesMap) {
int tableNameHash = Math.abs(_tableNameWithType.hashCode());
LOGGER.info("Starting instance tag/pool selection for table: {} with hash:
{}", _tableNameWithType, tableNameHash);
- // Filter out the instances with the correct tag
+ // If existingPoolToInstancesMap is null, treat it as an empty map.
+ if (existingPoolToInstancesMap == null) {
+ existingPoolToInstancesMap = Collections.emptyMap();
+ }
+ // Filter out the instances with the correct tag.
+ // Use LinkedHashMap here to retain the sorted list of instance names.
String tag = _tagPoolConfig.getTag();
- List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+ Map<String, InstanceConfig> candidateInstanceConfigsMap = new
LinkedHashMap<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
if (instanceConfig.getTags().contains(tag)) {
- candidateInstanceConfigs.add(instanceConfig);
+ candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(),
instanceConfig);
}
}
-
candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
- int numCandidateInstances = candidateInstanceConfigs.size();
+
+ // Find out newly added instances from the latest copies of instance
configs.
+ // A deque is used here in order to retain the sequence,
+ // given the fact that the list of instance configs is always sorted.
+ Deque<String> newlyAddedInstances = new
LinkedList<>(candidateInstanceConfigsMap.keySet());
+ for (List<String> existingInstancesWithSequence :
existingPoolToInstancesMap.values()) {
+ newlyAddedInstances.removeAll(existingInstancesWithSequence);
+ }
+
+ int numCandidateInstances = candidateInstanceConfigsMap.size();
Preconditions.checkState(numCandidateInstances > 0, "No enabled instance
has the tag: %s", tag);
LOGGER.info("{} enabled instances have the tag: {} for table: {}",
numCandidateInstances, tag, _tableNameWithType);
- Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new
TreeMap<>();
+ Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new
TreeMap<>();
if (_tagPoolConfig.isPoolBased()) {
- // Pool based selection
+ // Pool based selection. All the instances should be associated with a
specific pool number.
+ // Instance selection should be done within the same pool.
+ // E.g.: Pool0 -> [ I1, I2, I3 ]
+ // Pool1 -> [ I4, I5, I6 ]
- // Extract the pool information from the instance configs
- for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
+ // Each pool number associates with a map that key is the instance name
and value is the instance config.
+ Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new
HashMap<>();
+ // Each pool number associates with a list of newly added instance
configs,
+ // so that new instances can be fetched from this list.
+ Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new
HashMap<>();
+
+ // Extract the pool information from the instance configs.
+ for (Map.Entry<String, InstanceConfig> entry :
candidateInstanceConfigsMap.entrySet()) {
+ String instanceName = entry.getKey();
+ InstanceConfig instanceConfig = entry.getValue();
Map<String, String> poolMap =
instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
if (poolMap != null && poolMap.containsKey(tag)) {
int pool = Integer.parseInt(poolMap.get(tag));
- poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new
ArrayList<>()).add(instanceConfig);
+ poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new
TreeMap<>()).put(instanceName, instanceConfig);
+ if (newlyAddedInstances.contains(instanceName)) {
+ poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new
LinkedList<>()).add(instanceConfig);
+ }
+ }
+ }
+
+ for (Map.Entry<Integer, List<String>> entry :
existingPoolToInstancesMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<String> existingInstanceAssignmentInPool = entry.getValue();
+ List<InstanceConfig> candidateInstanceConfigsWithSequence = new
ArrayList<>();
+ for (String existingInstance: existingInstanceAssignmentInPool) {
+ InstanceConfig instanceConfig =
poolToInstanceConfigsMap.get(pool).get(existingInstance);
+ // Add instances to the candidate list and respect the sequence of
the existing instances from the ZK.
+ // The missing/removed instances will be replaced by the newly
instances.
+ // If the instance still exists from ZK, then add it to the
candidate list.
+ // E.g. if the old instances are: [I1, I2, I3, I4] and the new
instance are: [I1, I3, I4, I5, I6],
+ // the removed instance is I2 and the newly added instances are I5
and I6.
+ // The position of I2 would be replaced by I5, the new remaining I6
would be appended to the tail.
+ // Thus, the new order would be [I1, I5, I3, I4, I6].
+ if (instanceConfig != null) {
+ candidateInstanceConfigsWithSequence.add(instanceConfig);
+ } else {
+ // The current chosen instance no longer lives in the cluster any
more, thus pick a new instance.
+ InstanceConfig newInstanceConfig =
poolToNewInstanceConfigsMap.get(pool).pollFirst();
+ // If there is no new instance from the same pool, then don't add
it.
+ if (newInstanceConfig != null) {
+ candidateInstanceConfigsWithSequence.add(newInstanceConfig);
+ }
+ }
}
+ poolToLatestInstanceConfigsMap.put(pool,
candidateInstanceConfigsWithSequence);
Review comment:
`candidateInstanceConfigsWithSequence` can be empty if the pool does not
exist any more (both `poolToInstanceConfigsMap.get(pool)` and
`poolToNewInstanceConfigsMap.get(pool)` are null)?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -48,52 +52,117 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig
tagPoolConfig, String table
/**
* Returns a map from pool to instance configs based on the tag and pool
config for the given instance configs.
+ * @param instanceConfigs list of latest instance configs from ZK.
+ * @param existingPoolToInstancesMap existing instance with sequence that
should be respected. An empty list
+ * means no preceding sequence to
respect and the instances would be sorted.
*/
- public Map<Integer, List<InstanceConfig>>
selectInstances(List<InstanceConfig> instanceConfigs) {
+ public Map<Integer, List<InstanceConfig>>
selectInstances(List<InstanceConfig> instanceConfigs,
+ Map<Integer, List<String>> existingPoolToInstancesMap) {
int tableNameHash = Math.abs(_tableNameWithType.hashCode());
LOGGER.info("Starting instance tag/pool selection for table: {} with hash:
{}", _tableNameWithType, tableNameHash);
- // Filter out the instances with the correct tag
+ // If existingPoolToInstancesMap is null, treat it as an empty map.
+ if (existingPoolToInstancesMap == null) {
+ existingPoolToInstancesMap = Collections.emptyMap();
+ }
+ // Filter out the instances with the correct tag.
+ // Use LinkedHashMap here to retain the sorted list of instance names.
String tag = _tagPoolConfig.getTag();
- List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+ Map<String, InstanceConfig> candidateInstanceConfigsMap = new
LinkedHashMap<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
if (instanceConfig.getTags().contains(tag)) {
- candidateInstanceConfigs.add(instanceConfig);
+ candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(),
instanceConfig);
}
}
-
candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
- int numCandidateInstances = candidateInstanceConfigs.size();
+
+ // Find out newly added instances from the latest copies of instance
configs.
+ // A deque is used here in order to retain the sequence,
+ // given the fact that the list of instance configs is always sorted.
+ Deque<String> newlyAddedInstances = new
LinkedList<>(candidateInstanceConfigsMap.keySet());
+ for (List<String> existingInstancesWithSequence :
existingPoolToInstancesMap.values()) {
+ newlyAddedInstances.removeAll(existingInstancesWithSequence);
+ }
+
+ int numCandidateInstances = candidateInstanceConfigsMap.size();
Preconditions.checkState(numCandidateInstances > 0, "No enabled instance
has the tag: %s", tag);
LOGGER.info("{} enabled instances have the tag: {} for table: {}",
numCandidateInstances, tag, _tableNameWithType);
- Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new
TreeMap<>();
+ Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new
TreeMap<>();
if (_tagPoolConfig.isPoolBased()) {
- // Pool based selection
+ // Pool based selection. All the instances should be associated with a
specific pool number.
+ // Instance selection should be done within the same pool.
+ // E.g.: Pool0 -> [ I1, I2, I3 ]
+ // Pool1 -> [ I4, I5, I6 ]
- // Extract the pool information from the instance configs
- for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
+ // Each pool number associates with a map that key is the instance name
and value is the instance config.
+ Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new
HashMap<>();
+ // Each pool number associates with a list of newly added instance
configs,
+ // so that new instances can be fetched from this list.
+ Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new
HashMap<>();
+
+ // Extract the pool information from the instance configs.
+ for (Map.Entry<String, InstanceConfig> entry :
candidateInstanceConfigsMap.entrySet()) {
+ String instanceName = entry.getKey();
+ InstanceConfig instanceConfig = entry.getValue();
Map<String, String> poolMap =
instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
if (poolMap != null && poolMap.containsKey(tag)) {
int pool = Integer.parseInt(poolMap.get(tag));
- poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new
ArrayList<>()).add(instanceConfig);
+ poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new
TreeMap<>()).put(instanceName, instanceConfig);
+ if (newlyAddedInstances.contains(instanceName)) {
+ poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new
LinkedList<>()).add(instanceConfig);
+ }
+ }
+ }
+
+ for (Map.Entry<Integer, List<String>> entry :
existingPoolToInstancesMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<String> existingInstanceAssignmentInPool = entry.getValue();
+ List<InstanceConfig> candidateInstanceConfigsWithSequence = new
ArrayList<>();
+ for (String existingInstance: existingInstanceAssignmentInPool) {
+ InstanceConfig instanceConfig =
poolToInstanceConfigsMap.get(pool).get(existingInstance);
+ // Add instances to the candidate list and respect the sequence of
the existing instances from the ZK.
+ // The missing/removed instances will be replaced by the newly
instances.
+ // If the instance still exists from ZK, then add it to the
candidate list.
+ // E.g. if the old instances are: [I1, I2, I3, I4] and the new
instance are: [I1, I3, I4, I5, I6],
+ // the removed instance is I2 and the newly added instances are I5
and I6.
+ // The position of I2 would be replaced by I5, the new remaining I6
would be appended to the tail.
+ // Thus, the new order would be [I1, I5, I3, I4, I6].
Review comment:
`Collections.rotate(instanceConfigs, -(tableNameHash %
numInstanceConfigs));`
This does not guarantee the same rotation because `numInstanceConfigs` is
different?
Also, even if we maintain the order after we apply the constraints, current
`InstanceReplicaGroupPartitionSelector` will pick instances based on # of
replica-groups for each pool.
Say we want to change from 2* 2 to 3* 2, the old instance sequence is [I1,
I2, I3, I4] and the new sequence is [I1, I5, I3, I4, I6, I7]. The old
assignment is:
```
r1 -> {I1, I3}
r2 -> {I2, I4}
```
The new assignment will be:
```
r1 -> {I1, I4}
r2 -> {I5, I6}
r3 -> {I3, I7}
```
But ideally I3 and I4 should not be moved from one replica to another.
We need to improve InstanceReplicaGroupPartitionSelector.selectInstances()
to handle such case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]