Jackie-Jiang commented on code in PR #17515:
URL: https://github.com/apache/pinot/pull/17515#discussion_r2738029106
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -157,22 +159,25 @@ public class TableRebalancer {
private final RebalancePreChecker _rebalancePreChecker;
private final TableSizeReader _tableSizeReader;
private final PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+ private final boolean _updateIdealStateInstancePartitions;
public TableRebalancer(HelixManager helixManager, @Nullable
TableRebalanceObserver tableRebalanceObserver,
@Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker,
@Nullable TableSizeReader tableSizeReader,
- @Nullable PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager)
{
+ @Nullable PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+ boolean updateIdealStateInstancePartitions) {
_helixManager = helixManager;
_tableRebalanceObserver =
Objects.requireNonNullElseGet(tableRebalanceObserver,
NoOpTableRebalanceObserver::new);
_helixDataAccessor = helixManager.getHelixDataAccessor();
_controllerMetrics = controllerMetrics;
_rebalancePreChecker = rebalancePreChecker;
_tableSizeReader = tableSizeReader;
_pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _updateIdealStateInstancePartitions = updateIdealStateInstancePartitions;
}
public TableRebalancer(HelixManager helixManager) {
- this(helixManager, null, null, null, null, null);
+ this(helixManager, null, null, null, null, null, true);
Review Comment:
Should this be `true` or `false`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -2242,4 +2303,15 @@ private IdealState
forceCommitConsumingSegmentsAndWait(String tableNameWithType,
tableRebalanceLogger.info("Successfully force committed {} consuming
segments", segmentsToCommit.size());
return
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
}
+
+ private IdealState replaceInstancePartitionsInIdealState(String
tableNameWithType, IdealState currentIdealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
currentIdealState.getRecord().getListFields();
+
InstancePartitionsUtils.replaceInstancePartitionsInIdealState(currentIdealState,
instancePartitionsList);
+
+ return HelixHelper.updateIdealState(_helixManager, tableNameWithType, is
-> {
Review Comment:
We shouldn't perform retry here. The update needs to be version checked
update to ensure consistency of IS
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -212,4 +233,101 @@ public static boolean
shouldFetchPreConfiguredInstancePartitions(TableConfig tab
return hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)
&&
!InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType);
}
+
+ /**
+ * Update a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields. Previous instance partitions will be wiped out.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be written
into the ideal state instance partitions
+ * metadata.
+ */
+ public static void replaceInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ idealStateListFields.keySet().removeIf(key ->
key.startsWith(InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX));
+ updateInstancePartitionsInIdealState(idealState, instancePartitionsList);
+ }
+
+ /**
+ * Add a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be added to
the ideal state instance partitions
+ * metadata.
+ */
+ public static void updateInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ for (InstancePartitions instancePartitions : instancePartitionsList) {
+ String instancePartitionsName =
instancePartitions.getInstancePartitionsName();
+ for (String partitionReplica :
instancePartitions.getPartitionToInstancesMap().keySet()) {
+ String idealStateListKey =
InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX + instancePartitionsName
+ + InstancePartitionsUtils.IDEAL_STATE_IP_SEPARATOR +
partitionReplica;
+ idealStateListFields.put(idealStateListKey, new
ArrayList<>(instancePartitions.getInstances(partitionReplica)));
Review Comment:
(minor) No need to make a copy given IP is read only
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1854,7 +1809,7 @@ public void addTable(TableConfig tableConfig,
List<Pair<PartitionGroupMetadata,
Preconditions.checkState(tableConfig != null, "Failed to read table
config for table: %s", tableNameWithType);
// Assign instances
- assignInstances(tableConfig, true);
+ assignInstances(tableConfig, idealState, true);
Review Comment:
Should we revert the changes for instance assignment?
We should modify IS when assigning segment, not instance
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -212,4 +233,101 @@ public static boolean
shouldFetchPreConfiguredInstancePartitions(TableConfig tab
return hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)
&&
!InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType);
}
+
+ /**
+ * Update a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields. Previous instance partitions will be wiped out.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be written
into the ideal state instance partitions
+ * metadata.
+ */
+ public static void replaceInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ idealStateListFields.keySet().removeIf(key ->
key.startsWith(InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX));
+ updateInstancePartitionsInIdealState(idealState, instancePartitionsList);
+ }
+
+ /**
+ * Add a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be added to
the ideal state instance partitions
+ * metadata.
+ */
+ public static void updateInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ for (InstancePartitions instancePartitions : instancePartitionsList) {
+ String instancePartitionsName =
instancePartitions.getInstancePartitionsName();
+ for (String partitionReplica :
instancePartitions.getPartitionToInstancesMap().keySet()) {
+ String idealStateListKey =
InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX + instancePartitionsName
+ + InstancePartitionsUtils.IDEAL_STATE_IP_SEPARATOR +
partitionReplica;
+ idealStateListFields.put(idealStateListKey, new
ArrayList<>(instancePartitions.getInstances(partitionReplica)));
+ }
+ }
+ }
+
+ /**
+ * Extracts all instance partitions from the ideal state's list fields.
+ *
+ * <p>The ideal state stores instance partitions metadata in list fields
with keys of the format:
+ * {@code
INSTANCE_PARTITIONS__<instancePartitionsName>__<partitionId>_<replicaGroupId>}
+ *
+ * <p>The instance partitions name itself may contain {@code __} (e.g., for
tier instance partitions like
+ * {@code myTable__TIER__hotTier}), so we parse from the right by finding
the last {@code __} separator
+ * which should always be followed by a valid 'partitionId_replicaId'
pattern ({@code <int>_<int>}).
+ *
+ * @param idealState The ideal state to extract instance partitions from
+ * @return A map from instance partitions name to the reconstructed
InstancePartitions object
+ */
+ public static Map<String, InstancePartitions>
extractInstancePartitionsFromIdealState(IdealState idealState) {
+ Map<String, InstancePartitions> instancePartitionsMap = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry :
idealState.getRecord().getListFields().entrySet()) {
+ String key = entry.getKey();
+ // Keys look like 'INSTANCE_PARTITIONS__myTable_CONSUMING__0_1' and
+ // 'INSTANCE_PARTITIONS__myTable__TIER__hotTier__0_1'
+ if (key.startsWith(IDEAL_STATE_IP_PREFIX)) {
+ String remainder = key.substring(IDEAL_STATE_IP_PREFIX.length());
+
+ // The last '__' separates the instance partitions name from the
partition-replica suffix
+ int lastSeparatorIdx = remainder.lastIndexOf(IDEAL_STATE_IP_SEPARATOR);
+ String instancePartitionsName = remainder.substring(0,
lastSeparatorIdx);
+ String partitionReplica = remainder.substring(lastSeparatorIdx +
IDEAL_STATE_IP_SEPARATOR.length());
+ List<String> instances = entry.getValue();
+
+ instancePartitionsMap.computeIfAbsent(instancePartitionsName,
InstancePartitions::new)
+ .setInstances(partitionReplica, instances);
+ }
+ }
+ return instancePartitionsMap;
+ }
+
+ /// Creates a map from server instance to replica group ID using the ideal
state instance partitions metadata.
+ public static Map<String, Integer> serverToReplicaGroupMap(IdealState
idealState) {
+ Map<String, Integer> serverToReplicaGroupMap = new HashMap<>();
+
+ for (Map.Entry<String, List<String>> listFields :
idealState.getRecord().getListFields().entrySet()) {
+ String key = listFields.getKey();
+ // key looks like "INSTANCE_PARTITIONS__<INSTANCE_PARTITION_NAME>"
+ // <INSTANCE_PARTITION_NAME> typically looks like myTable_CONSUMING__0_1
(here, 0 would be the partition ID and
+ // 1 would be the replica group ID)
+ if (key.startsWith(InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX)) {
+ int separatorIndex =
key.lastIndexOf(InstancePartitions.PARTITION_REPLICA_GROUP_SEPARATOR);
+ Integer replicaGroup = Integer.parseInt(key.substring(separatorIndex +
1));
+ listFields.getValue().forEach(value -> {
+ if (serverToReplicaGroupMap.containsKey(value)) {
+ LOGGER.warn("Server {} assigned to multiple replica groups ({},
{})", value, replicaGroup,
Review Comment:
Is it possible that one server is assigned to multiple replicas? If so, will
this break routing?
Should we consider throwing exception and fall back when this happens?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1520,6 +1524,27 @@ void
updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>>
throw new HelixHelper.PermanentUpdaterException(errorMsg);
}
}
+
+ if (_isIdealStateInstancePartitionsEnabled) {
+ // Check if ideal state instance partitions match the newly fetched /
computed instance partitions.
+ // If there's a mismatch, wipe out ideal state instance partitions and
log that a rebalance is required.
+ Map<String, InstancePartitions> idealStateInstancePartitions =
+
InstancePartitionsUtils.extractInstancePartitionsFromIdealState(idealState);
+ for (InstancePartitions instancePartitions :
instancePartitionsMap.values()) {
+ if (!instancePartitions.equals(
+
idealStateInstancePartitions.get(instancePartitions.getInstancePartitionsName())))
{
+ LOGGER.warn(
Review Comment:
We should add a table level gauge to reflect if IP is wiped for IP enabled
table
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -296,6 +301,18 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
"Cannot rebalance disabled table without downtime", null, null,
null, null, null);
}
+ // Wipe out ideal state instance partitions metadata
Review Comment:
We shouldn't wipe it until a rebalance is indeed required.
E.g. when `segmentAssignmentUnchanged`, we should check if instance
partitions changed, then modify accordingly.
If we wipe it here, and following part throws exception, we might end up
with an IS without instance partitions
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -722,6 +764,25 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tableRebalanceLogger.info(msg);
// Record completion
_tableRebalanceObserver.onSuccess(msg);
+
+ if (_updateIdealStateInstancePartitions) {
+ // Rebalance completed successfully, so we can update the instance
partitions in the ideal state to reflect
+ // the new set of instance partitions.
+ List<InstancePartitions> instancePartitionsList = new
ArrayList<>(instancePartitionsMap.values());
Review Comment:
Consider making the order of this list deterministic, so that we can check
if it is identical to the existing one
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -212,4 +233,101 @@ public static boolean
shouldFetchPreConfiguredInstancePartitions(TableConfig tab
return hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)
&&
!InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType);
}
+
+ /**
+ * Update a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields. Previous instance partitions will be wiped out.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be written
into the ideal state instance partitions
+ * metadata.
+ */
+ public static void replaceInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ idealStateListFields.keySet().removeIf(key ->
key.startsWith(InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX));
+ updateInstancePartitionsInIdealState(idealState, instancePartitionsList);
+ }
+
+ /**
+ * Add a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be added to
the ideal state instance partitions
+ * metadata.
+ */
+ public static void updateInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ for (InstancePartitions instancePartitions : instancePartitionsList) {
+ String instancePartitionsName =
instancePartitions.getInstancePartitionsName();
+ for (String partitionReplica :
instancePartitions.getPartitionToInstancesMap().keySet()) {
Review Comment:
(minor) We can do `entrySet()` to reduce map lookup
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -212,4 +233,101 @@ public static boolean
shouldFetchPreConfiguredInstancePartitions(TableConfig tab
return hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)
&&
!InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType);
}
+
+ /**
+ * Update a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields. Previous instance partitions will be wiped out.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be written
into the ideal state instance partitions
+ * metadata.
+ */
+ public static void replaceInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ idealStateListFields.keySet().removeIf(key ->
key.startsWith(InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX));
+ updateInstancePartitionsInIdealState(idealState, instancePartitionsList);
+ }
+
+ /**
+ * Add a given set of instance partitions into an ideal state's instance
partitions metadata maintained in its
+ * list fields.
+ *
+ * @param idealState Current ideal state
+ * @param instancePartitionsList List of instance partitions to be added to
the ideal state instance partitions
+ * metadata.
+ */
+ public static void updateInstancePartitionsInIdealState(IdealState
idealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
idealState.getRecord().getListFields();
+ for (InstancePartitions instancePartitions : instancePartitionsList) {
+ String instancePartitionsName =
instancePartitions.getInstancePartitionsName();
+ for (String partitionReplica :
instancePartitions.getPartitionToInstancesMap().keySet()) {
+ String idealStateListKey =
InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX + instancePartitionsName
+ + InstancePartitionsUtils.IDEAL_STATE_IP_SEPARATOR +
partitionReplica;
+ idealStateListFields.put(idealStateListKey, new
ArrayList<>(instancePartitions.getInstances(partitionReplica)));
+ }
+ }
+ }
+
+ /**
+ * Extracts all instance partitions from the ideal state's list fields.
+ *
+ * <p>The ideal state stores instance partitions metadata in list fields
with keys of the format:
+ * {@code
INSTANCE_PARTITIONS__<instancePartitionsName>__<partitionId>_<replicaGroupId>}
+ *
+ * <p>The instance partitions name itself may contain {@code __} (e.g., for
tier instance partitions like
+ * {@code myTable__TIER__hotTier}), so we parse from the right by finding
the last {@code __} separator
+ * which should always be followed by a valid 'partitionId_replicaId'
pattern ({@code <int>_<int>}).
+ *
+ * @param idealState The ideal state to extract instance partitions from
+ * @return A map from instance partitions name to the reconstructed
InstancePartitions object
+ */
+ public static Map<String, InstancePartitions>
extractInstancePartitionsFromIdealState(IdealState idealState) {
+ Map<String, InstancePartitions> instancePartitionsMap = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry :
idealState.getRecord().getListFields().entrySet()) {
+ String key = entry.getKey();
+ // Keys look like 'INSTANCE_PARTITIONS__myTable_CONSUMING__0_1' and
+ // 'INSTANCE_PARTITIONS__myTable__TIER__hotTier__0_1'
+ if (key.startsWith(IDEAL_STATE_IP_PREFIX)) {
+ String remainder = key.substring(IDEAL_STATE_IP_PREFIX.length());
+
+ // The last '__' separates the instance partitions name from the
partition-replica suffix
+ int lastSeparatorIdx = remainder.lastIndexOf(IDEAL_STATE_IP_SEPARATOR);
+ String instancePartitionsName = remainder.substring(0,
lastSeparatorIdx);
+ String partitionReplica = remainder.substring(lastSeparatorIdx +
IDEAL_STATE_IP_SEPARATOR.length());
+ List<String> instances = entry.getValue();
+
+ instancePartitionsMap.computeIfAbsent(instancePartitionsName,
InstancePartitions::new)
+ .setInstances(partitionReplica, instances);
+ }
+ }
+ return instancePartitionsMap;
+ }
+
+ /// Creates a map from server instance to replica group ID using the ideal
state instance partitions metadata.
+ public static Map<String, Integer> serverToReplicaGroupMap(IdealState
idealState) {
+ Map<String, Integer> serverToReplicaGroupMap = new HashMap<>();
+
+ for (Map.Entry<String, List<String>> listFields :
idealState.getRecord().getListFields().entrySet()) {
+ String key = listFields.getKey();
+ // key looks like "INSTANCE_PARTITIONS__<INSTANCE_PARTITION_NAME>"
+ // <INSTANCE_PARTITION_NAME> typically looks like myTable_CONSUMING__0_1
(here, 0 would be the partition ID and
+ // 1 would be the replica group ID)
+ if (key.startsWith(InstancePartitionsUtils.IDEAL_STATE_IP_PREFIX)) {
+ int separatorIndex =
key.lastIndexOf(InstancePartitions.PARTITION_REPLICA_GROUP_SEPARATOR);
+ Integer replicaGroup = Integer.parseInt(key.substring(separatorIndex +
1));
+ listFields.getValue().forEach(value -> {
+ if (serverToReplicaGroupMap.containsKey(value)) {
Review Comment:
(minor) You can call `put()` and check the return value to reduce map access.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java:
##########
@@ -32,19 +33,23 @@ public class SegmentInstanceCandidate {
private final String _instance;
private final boolean _online;
private final int _pool;
+ private final int _replicaGroupId;
Review Comment:
(minor) Rename it to `_replicaId`
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java:
##########
@@ -113,6 +114,12 @@ public List<String> getInstances(int partitionId, int
replicaGroupId) {
.get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR
+ replicaGroupId);
}
+ /// Given a partition ID and replica group ID like "0_0", return the list of
instances belonging to that instance
+ /// partition
+ public List<String> getInstances(String partitionReplica) {
Review Comment:
(minor) It is intentional to not provide this method to reduce map access.
Caller should use `entrySet()` of the `_partitionToInstancesMap` instead of
looking up each key
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -2242,4 +2303,15 @@ private IdealState
forceCommitConsumingSegmentsAndWait(String tableNameWithType,
tableRebalanceLogger.info("Successfully force committed {} consuming
segments", segmentsToCommit.size());
return
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
}
+
+ private IdealState replaceInstancePartitionsInIdealState(String
tableNameWithType, IdealState currentIdealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
currentIdealState.getRecord().getListFields();
+
InstancePartitionsUtils.replaceInstancePartitionsInIdealState(currentIdealState,
instancePartitionsList);
+
+ return HelixHelper.updateIdealState(_helixManager, tableNameWithType, is
-> {
Review Comment:
We should wipe the IP with the first IS change, and restore it with the last
IS change. Replacing IP as separate step can cause inconsistency
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]