This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4154559944a Fix rebalance logic to treat COMMITTING segments as
CONSUMING (#16348)
4154559944a is described below
commit 4154559944ac29e042900a6a622ea0f04b574d8f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jul 16 15:08:43 2025 -0600
Fix rebalance logic to treat COMMITTING segments as CONSUMING (#16348)
---
.../segment/RealtimeSegmentAssignment.java | 14 +++-
.../assignment/segment/SegmentAssignmentUtils.java | 49 +++++++++----
.../realtime/PinotLLCRealtimeSegmentManager.java | 82 +++++++++++++---------
.../core/realtime/SegmentCompletionManager.java | 1 -
4 files changed, 97 insertions(+), 49 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 4909c4b1169..80ed9cdc292 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -20,17 +20,21 @@ package
org.apache.pinot.controller.helix.core.assignment.segment;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.SegmentUtils;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
+import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -192,8 +196,16 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
+ "includeConsuming: {}, bootstrap: {}", _tableNameWithType,
completedInstancePartitions,
consumingInstancePartitions, includeConsuming, bootstrap);
+ Set<String> committingSegments = null;
+ if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
+ List<String> committingSegmentList =
PinotLLCRealtimeSegmentManager.getCommittingSegments(_tableNameWithType,
+ _helixManager.getHelixPropertyStore());
+ if (!committingSegmentList.isEmpty()) {
+ committingSegments = new HashSet<>(committingSegmentList);
+ }
+ }
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment
completedConsumingOfflineSegmentAssignment =
- new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment);
+ new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment,
committingSegments);
Map<String, Map<String, String>> newAssignment;
// Reassign COMPLETED segments first
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 027f8defba9..68ca22fbda4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -29,6 +29,8 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -37,6 +39,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.tier.Tier;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.Pairs;
@@ -387,19 +390,38 @@ public class SegmentAssignmentUtils {
private final Map<String, Map<String, String>> _offlineSegmentAssignment =
new TreeMap<>();
// NOTE: split the segments based on the following criteria:
- // 1. At least one instance ONLINE -> COMPLETED segment
- // 2. At least one instance CONSUMING -> CONSUMING segment
+ // 1. At least one instance ONLINE && segment is not COMMITTING ->
COMPLETED segment
+ // 2. At least one instance CONSUMING || segment is COMMITTING ->
CONSUMING segment
// 3. All instances OFFLINE (all instances encountered error while
consuming) -> OFFLINE segment
- CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String,
String>> segmentAssignment) {
- for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
- _completedSegmentAssignment.put(segmentName, instanceStateMap);
- } else if
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
- _consumingSegmentAssignment.put(segmentName, instanceStateMap);
- } else {
- _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+ CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String,
String>> segmentAssignment,
+ @Nullable Set<String> committingSegments) {
+ if (CollectionUtils.isEmpty(committingSegments)) {
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
+ _completedSegmentAssignment.put(segmentName, instanceStateMap);
+ } else if
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+ _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+ } else {
+ _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+ }
+ }
+ } else {
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
+ if (committingSegments.contains(segmentName)) {
+ _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+ } else {
+ _completedSegmentAssignment.put(segmentName, instanceStateMap);
+ }
+ } else if
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+ _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+ } else {
+ _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+ }
}
}
}
@@ -452,7 +474,8 @@ public class SegmentAssignmentUtils {
// find an eligible tier for the segment, from the ordered list of
tiers
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(propertyStore,
tableNameWithType, segmentName);
- if (segmentZKMetadata != null) {
+ // Skip COMMITTING segments
+ if (segmentZKMetadata != null && segmentZKMetadata.getStatus() !=
Status.COMMITTING) {
for (Tier tier : sortedTiers) {
if (tier.getSegmentSelector().selectSegment(tableNameWithType,
segmentZKMetadata)) {
_tierNameToSegmentAssignmentMap.get(tier.getName()).put(segmentName,
instanceStateMap);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5f7c9501785..57cff126e40 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
@@ -484,10 +485,9 @@ public class PinotLLCRealtimeSegmentManager {
Stat stat = new Stat();
ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat,
AccessOption.PERSISTENT);
int expectedVersion = stat.getVersion();
- LOGGER.info("Committing segments list size: {} before adding the segment:
{}", Optional.ofNullable(znRecord)
- .map(record -> record.getListField(COMMITTING_SEGMENTS))
- .map(List::size)
- .orElse(0), segmentName);
+ LOGGER.info("Committing segments list size: {} before adding the segment:
{}",
+ Optional.ofNullable(znRecord).map(record ->
record.getListField(COMMITTING_SEGMENTS)).map(List::size).orElse(0),
+ segmentName);
// empty ZN record for the table
if (znRecord == null) {
@@ -521,10 +521,9 @@ public class PinotLLCRealtimeSegmentManager {
Stat stat = new Stat();
ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat,
AccessOption.PERSISTENT);
- LOGGER.info("Committing segments list size: {} before removing the
segment: {}", Optional.ofNullable(znRecord)
- .map(record -> record.getListField(COMMITTING_SEGMENTS))
- .map(List::size)
- .orElse(0), segmentName);
+ LOGGER.info("Committing segments list size: {} before removing the
segment: {}",
+ Optional.ofNullable(znRecord).map(record ->
record.getListField(COMMITTING_SEGMENTS)).map(List::size).orElse(0),
+ segmentName);
if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null
|| !znRecord.getListField(
COMMITTING_SEGMENTS).contains(segmentName)) {
@@ -806,8 +805,7 @@ public class PinotLLCRealtimeSegmentManager {
} else {
LOGGER.info(
"Skipping creation of new segment metadata after segment: {}
during commit. Reason: Partition ID: {} not "
- + "found in upstream metadata.",
- committingSegmentName, committingSegmentPartitionGroupId);
+ + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
}
} else {
LOGGER.info(
@@ -1186,12 +1184,10 @@ public class PinotLLCRealtimeSegmentManager {
prevSegmentZKMetadata.setSizeThresholdToFlushSegment(newNumRows);
persistSegmentZKMetadata(realtimeTableName, prevSegmentZKMetadata,
stat.getVersion());
- _helixResourceManager.resetSegment(
- realtimeTableName, segmentName, null);
- LOGGER.info("Reduced segment size of {} from prevTarget {} prevActual {}
to {}",
- segmentName, prevTargetNumRows, prevNumRows, newNumRows);
- _controllerMetrics.addMeteredTableValue(
- realtimeTableName, ControllerMeter.SEGMENT_SIZE_AUTO_REDUCTION, 1L);
+ _helixResourceManager.resetSegment(realtimeTableName, segmentName, null);
+ LOGGER.info("Reduced segment size of {} from prevTarget {} prevActual {}
to {}", segmentName, prevTargetNumRows,
+ prevNumRows, newNumRows);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.SEGMENT_SIZE_AUTO_REDUCTION, 1L);
}
/**
@@ -1272,8 +1268,8 @@ public class PinotLLCRealtimeSegmentManager {
boolean offsetsHaveToChange = offsetCriteria != null;
if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- offsetsHaveToChange
- ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
+ offsetsHaveToChange ? Collections.emptyList()
+ // offsets from metadata are not valid anymore; fetch for
all partitions
: getPartitionGroupConsumptionStatusList(idealState,
streamConfigs);
// FIXME: Right now, we assume topics are sharing same offset
criteria
OffsetCriteria originalOffsetCriteria =
streamConfigs.get(0).getOffsetCriteria();
@@ -1628,8 +1624,8 @@ public class PinotLLCRealtimeSegmentManager {
// Do not create new CONSUMING segment when the stream partition has
reached end of life.
if (!partitionIdToSmallestOffset.containsKey(partitionId)) {
- LOGGER.info("PartitionGroup: {} has reached end of life. Skipping
creation of new segment {}",
- partitionId, latestSegmentName);
+ LOGGER.info("PartitionGroup: {} has reached end of life. Skipping
creation of new segment {}", partitionId,
+ latestSegmentName);
continue;
}
@@ -1722,8 +1718,8 @@ public class PinotLLCRealtimeSegmentManager {
instancePartitionsMap);
}
- private Map<Integer, StreamPartitionMsgOffset>
fetchPartitionGroupIdToSmallestOffset(
- List<StreamConfig> streamConfigs, IdealState idealState) {
+ private Map<Integer, StreamPartitionMsgOffset>
fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs,
+ IdealState idealState) {
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset =
new HashMap<>();
for (StreamConfig streamConfig : streamConfigs) {
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
@@ -2496,10 +2492,8 @@ public class PinotLLCRealtimeSegmentManager {
}
}
-
if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
- ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
return;
@@ -2518,12 +2512,12 @@ public class PinotLLCRealtimeSegmentManager {
return;
} else {
LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
- ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
segmentsInErrorStateInAllReplicas.size());
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+ segmentsInErrorStateInAllReplicas.size());
}
for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
- SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName);
+ SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
if (segmentZKMetadata == null) {
LOGGER.warn("Segment metadata not found for segment: {} in table: {},
skipping repairing it", segmentName,
realtimeTableName);
@@ -2704,6 +2698,20 @@ public class PinotLLCRealtimeSegmentManager {
});
}
+ public List<String> getCommittingSegments(String realtimeTableName) {
+ return getCommittingSegments(realtimeTableName, _propertyStore,
_helixResourceManager::getSegmentZKMetadata);
+ }
+
+ private List<String> getCommittingSegments(String realtimeTableName,
Collection<String> segmentsToCheck) {
+ return getCommittingSegments(realtimeTableName, segmentsToCheck,
_helixResourceManager::getSegmentZKMetadata);
+ }
+
+ public static List<String> getCommittingSegments(String realtimeTableName,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ return getCommittingSegments(realtimeTableName, propertyStore,
+ (t, s) -> ZKMetadataProvider.getSegmentZKMetadata(propertyStore, t,
s));
+ }
+
/**
* Retrieves and filters the list of committing segments for a realtime
table from the property store.
* This method:
@@ -2714,27 +2722,33 @@ public class PinotLLCRealtimeSegmentManager {
* @param realtimeTableName The name of the realtime table to fetch
committing segments for
* @return Filtered list of committing segments
*/
- public List<String> getCommittingSegments(String realtimeTableName) {
+ private static List<String> getCommittingSegments(String realtimeTableName,
+ ZkHelixPropertyStore<ZNRecord> propertyStore, BiFunction<String, String,
SegmentZKMetadata> zkMetadataProvider) {
String pauselessDebugMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
- ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, null,
AccessOption.PERSISTENT);
+ ZNRecord znRecord = propertyStore.get(pauselessDebugMetadataPath, null,
AccessOption.PERSISTENT);
if (znRecord == null) {
return List.of();
}
- return getCommittingSegments(realtimeTableName,
znRecord.getListField(COMMITTING_SEGMENTS));
+ List<String> committingSegments =
znRecord.getListField(COMMITTING_SEGMENTS);
+ if (committingSegments == null) {
+ return List.of();
+ }
+ return getCommittingSegments(realtimeTableName, committingSegments,
zkMetadataProvider);
}
/**
* Returns the list of segments that are in COMMITTING state. Filters out
segments that are either deleted or no
* longer in COMMITTING state.
*/
- private List<String> getCommittingSegments(String realtimeTableName,
@Nullable Collection<String> segmentsToCheck) {
- if (CollectionUtils.isEmpty(segmentsToCheck)) {
+ private static List<String> getCommittingSegments(String realtimeTableName,
Collection<String> segmentsToCheck,
+ BiFunction<String, String, SegmentZKMetadata> zkMetadataProvider) {
+ if (segmentsToCheck.isEmpty()) {
return List.of();
}
List<String> committingSegments = new ArrayList<>(segmentsToCheck.size());
for (String segment : segmentsToCheck) {
- SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment);
+ SegmentZKMetadata segmentZKMetadata =
zkMetadataProvider.apply(realtimeTableName, segment);
if (segmentZKMetadata != null && segmentZKMetadata.getStatus() ==
Status.COMMITTING) {
committingSegments.add(segment);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7607f2114a0..520daaeea3b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -127,7 +127,6 @@ public class SegmentCompletionManager {
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
String segmentName = llcSegmentName.getSegmentName();
SegmentZKMetadata segmentMetadata =
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
- Preconditions.checkState(segmentMetadata != null, "Failed to find ZK
metadata for segment: %s", segmentName);
TableConfig tableConfig =
_segmentManager.getTableConfig(realtimeTableName);
String factoryName = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]