This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a37ced6ec9 Fix rebalance on upsert table (#12054)
a37ced6ec9 is described below
commit a37ced6ec998aa771a71e6eba7624942b66d34b4
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Nov 27 18:13:00 2023 -0800
Fix rebalance on upsert table (#12054)
---
.../segment/StrictRealtimeSegmentAssignment.java | 174 ++++++++++++++-------
.../helix/core/rebalance/TableRebalancer.java | 58 ++++---
2 files changed, 155 insertions(+), 77 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index e8913563e3..2bc0ada5bd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -19,15 +19,21 @@
package org.apache.pinot.controller.helix.core.assignment.segment;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.CommonConstants;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
/**
@@ -52,26 +58,53 @@ import org.apache.pinot.spi.utils.CommonConstants;
*/
public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment
{
+ // Cache segment partition id to avoid ZK reads.
+ // NOTE:
+ // 1. This cache is used for table rebalance only, but not segment
assignment. During rebalance, rebalanceTable() can
+ // be invoked multiple times when the ideal state changes during the
rebalance process.
+ // 2. The cache won't be refreshed when an existing segment is replaced with
a segment from a different partition.
+ // Replacing a segment with a segment from a different partition should
not be allowed for upsert table because it
+ // will cause the segment being served by the wrong servers. If this
happens during the table rebalance, another
+ // rebalance might be needed to fix the assignment.
+ private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
+
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
- Map.Entry<InstancePartitionsType, InstancePartitions>
typeToInstancePartitions =
- instancePartitionsMap.entrySet().iterator().next();
- InstancePartitionsType instancePartitionsType =
typeToInstancePartitions.getKey();
- InstancePartitions instancePartitions =
typeToInstancePartitions.getValue();
- Preconditions.checkState(instancePartitionsType ==
InstancePartitionsType.CONSUMING,
- "Only CONSUMING instance partition type is allowed for table using
upsert but got: " + instancePartitionsType);
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
_logger.info("Assigning segment: {} with instance partitions: {} for
table: {}", segmentName, instancePartitions,
_tableNameWithType);
- int segmentPartitionId =
- SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager,
- _partitionColumn);
- List<String> instancesAssigned =
assignConsumingSegment(segmentPartitionId, instancePartitions);
- // Iterate the idealState to find the first segment that's in the same
table partition with the new segment, and
- // check if their assignments are same. We try to derive the partition id
from segment name to avoid ZK reads.
- Set<String> idealAssignment = null;
- List<String> nonStandardSegments = new ArrayList<>();
+
+ int partitionId = getPartitionId(segmentName);
+ List<String> instancesAssigned = assignConsumingSegment(partitionId,
instancePartitions);
+ Set<String> existingAssignment = getExistingAssignment(partitionId,
currentAssignment);
+ // Check if the candidate assignment is consistent with existing
assignment. Use existing assignment if not.
+ if (existingAssignment == null) {
+ _logger.info("No existing assignment from idealState, using the one
decided by instancePartitions");
+ } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+ _logger.warn("Assignment: {} is inconsistent with idealState: {}, using
the one from idealState",
+ instancesAssigned, existingAssignment);
+ instancesAssigned = new ArrayList<>(existingAssignment);
+ if (_controllerMetrics != null) {
+ _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ }
+ }
+ _logger.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
+ _tableNameWithType);
+ return instancesAssigned;
+ }
+
+ /**
+ * Returns the existing assignment for the given partition id, or {@code
null} if there is no existing segment for the
+ * partition. We try to derive the partition id from segment name to avoid
ZK reads.
+ */
+ @Nullable
+ private Set<String> getExistingAssignment(int partitionId, Map<String,
Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
// Skip OFFLINE segments as they are not rebalanced, so their assignment
in idealState can be stale.
if (isOfflineSegment(entry.getValue())) {
@@ -79,59 +112,86 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
}
LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
if (llcSegmentName == null) {
- nonStandardSegments.add(entry.getKey());
+ uploadedSegments.add(entry.getKey());
continue;
}
- if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
- idealAssignment = entry.getValue().keySet();
- break;
- }
- }
- if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
- if (_logger.isDebugEnabled()) {
- int segmentCnt = nonStandardSegments.size();
- if (segmentCnt <= 10) {
- _logger.debug("Check ZK metadata of {} segments: {} for any one also
from partition: {}", segmentCnt,
- nonStandardSegments, segmentPartitionId);
- } else {
- _logger.debug("Check ZK metadata of {} segments: {}... for any one
also from partition: {}", segmentCnt,
- nonStandardSegments.subList(0, 10), segmentPartitionId);
- }
- }
- // Check ZK metadata for segments with non-standard LLC segment names to
look for a segment that's in the same
- // table partition with the new segment.
- for (String nonStandardSegment : nonStandardSegments) {
- if
(SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment,
_tableNameWithType, _helixManager,
- _partitionColumn) == segmentPartitionId) {
- idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
- break;
- }
+ if (llcSegmentName.getPartitionGroupId() == partitionId) {
+ return entry.getValue().keySet();
}
}
- // Check if the candidate assignment is consistent with idealState. Use
idealState if not.
- if (idealAssignment == null) {
- _logger.info("No existing assignment from idealState, using the one
decided by instancePartitions");
- } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
- _logger.warn("Assignment: {} is inconsistent with idealState: {}, using
the one from idealState",
- instancesAssigned, idealAssignment);
- instancesAssigned.clear();
- instancesAssigned.addAll(idealAssignment);
- if (_controllerMetrics != null) {
- _controllerMetrics.addMeteredTableValue(_tableNameWithType,
-
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ // Check ZK metadata for uploaded segments to look for a segment that's in
the same partition
+ for (String uploadedSegment : uploadedSegments) {
+ if (getPartitionId(uploadedSegment) == partitionId) {
+ return currentAssignment.get(uploadedSegment).keySet();
}
}
- _logger.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
- _tableNameWithType);
- return instancesAssigned;
+ return null;
}
+ /**
+ * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor
CONSUMING), {@code false} otherwise.
+ */
+ private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+ return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) &&
!instanceStateMap.containsValue(
+ SegmentStateModel.CONSUMING);
+ }
+
+ /**
+ * Returns the partition id of the given segment.
+ */
+ private int getPartitionId(String segmentName) {
+ Integer partitionId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, _partitionColumn);
+ Preconditions.checkState(partitionId != null, "Failed to find partition id
for segment: %s of table: %s",
+ segmentName, _tableNameWithType);
+ return partitionId;
+ }
+
+ /**
+ * Returns {@code true} if the ideal assignment and the actual assignment
are the same, {@code false} otherwise.
+ */
private boolean isSameAssignment(Set<String> idealAssignment, List<String>
instancesAssigned) {
return idealAssignment.size() == instancesAssigned.size() &&
idealAssignment.containsAll(instancesAssigned);
}
- private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
- return
!instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)
- &&
!instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
+ Preconditions.checkArgument(config.isIncludeConsuming(),
+ "Consuming segment must be included when rebalancing upsert table:
%s", _tableNameWithType);
+ Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert table: %s",
+ _tableNameWithType);
+ _logger.info("Rebalancing table: {} with instance partitions: {}",
_tableNameWithType, instancePartitions);
+
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (isOfflineSegment(instanceStateMap)) {
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the
+ // OFFLINE segments and re-assign them
+ newAssignment.put(segmentName, instanceStateMap);
+ } else {
+ // Reassign CONSUMING and COMPLETED segments
+ List<String> instancesAssigned =
+ assignConsumingSegment(getPartitionIdUsingCache(segmentName),
instancePartitions);
+ String state =
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ?
SegmentStateModel.CONSUMING
+ : SegmentStateModel.ONLINE;
+ newAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+ }
+ }
+ return newAssignment;
+ }
+
+ /**
+ * Returns the partition id of the given segment, using cached partition id
if exists.
+ */
+ private int getPartitionIdUsingCache(String segmentName) {
+ return _segmentPartitionIdMap.computeIntIfAbsent(segmentName,
this::getPartitionId);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a052794ae2..cd4d70ee3c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -58,6 +58,7 @@ import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import
org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -326,14 +327,18 @@ public class TableRebalancer {
targetAssignment);
// Calculate the min available replicas for no-downtime rebalance
- // NOTE: The calculation is based on the number of replicas of the target
assignment. In case of increasing the
- // number of replicas for the current assignment, the current
instance state map might not have enough
- // replicas to reach the minimum available replicas requirement. In
this scenario we don't want to fail the
- // check, but keep all the current instances as this is the best we
can do, and can help the table get out of
- // this state.
+ // NOTE:
+ // 1. The calculation is based on the number of replicas of the target
assignment. In case of increasing the number
+ // of replicas for the current assignment, the current instance state
map might not have enough replicas to reach
+ // the minimum available replicas requirement. In this scenario we
don't want to fail the check, but keep all the
+ // current instances as this is the best we can do, and can help the
table get out of this state.
+ // 2. Only check the segments to be moved because we don't need to
maintain available replicas for segments not
+ // being moved, including segments with all replicas OFFLINE (error
segments during consumption).
+ Set<String> segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+
int numReplicas = Integer.MAX_VALUE;
- for (Map<String, String> instanceStateMap : targetAssignment.values()) {
- numReplicas = Math.min(instanceStateMap.size(), numReplicas);
+ for (String segment : segmentsToMove) {
+ numReplicas = Math.min(targetAssignment.get(segment).size(),
numReplicas);
}
int minAvailableReplicas;
if (minReplicasToKeepUpForNoDowntime >= 0) {
@@ -362,9 +367,17 @@ public class TableRebalancer {
externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
int expectedVersion = currentIdealState.getRecord().getVersion();
+ // We repeat the following steps until the target assignment is reached:
+ // 1. Wait for ExternalView to converge with the IdealState. Fail the
rebalance if it doesn't converge within the
+ // timeout.
+ // 2. When IdealState changes during step 1, re-calculate the target
assignment based on the new IdealState (current
+ // assignment).
+ // 3. Check if the target assignment is reached. Rebalance is done if it
is reached.
+ // 4. Calculate the next assignment based on the current assignment,
target assignment and min available replicas.
+ // 5. Update the IdealState to the next assignment. If the IdealState
changes before the update, go back to step 1.
while (true) {
// Wait for ExternalView to converge before updating the next IdealState
- Set<String> segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+ segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
IdealState idealState;
try {
idealState =
@@ -399,16 +412,22 @@ public class TableRebalancer {
// If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply the
// same target instance state map for these segments to the new ideal
state as the target assignment
boolean segmentsToMoveChanged = false;
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- LOGGER.info(
- "For rebalanceId: {}, segment state changed in IdealState
from: {} to: {} for table: {}, segment: {}, "
- + "re-calculating the target assignment based on the new
IdealState", rebalanceJobId,
- oldInstanceStateMap, currentInstanceStateMap,
tableNameWithType, segment);
- segmentsToMoveChanged = true;
- break;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment for
+ // new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING to
ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ LOGGER.info("For rebalanceId: {}, segment state changed in
IdealState from: {} to: {} for table: {}, "
+ + "segment: {}, re-calculating the target assignment
based on the new IdealState", rebalanceJobId,
+ oldInstanceStateMap, currentInstanceStateMap,
tableNameWithType, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
if (segmentsToMoveChanged) {
@@ -417,8 +436,7 @@ public class TableRebalancer {
instancePartitionsMap =
getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false).getLeft();
tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap,
- dryRun).getLeft();
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false).getLeft();
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]