This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eb6b0718c94 Fix NPE in TableRebalancer (#17723)
eb6b0718c94 is described below
commit eb6b0718c945cb54e4e0ce241875f9da8323f6af
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Feb 19 13:27:55 2026 -0800
Fix NPE in TableRebalancer (#17723)
---
.../core/rebalance/DefaultRebalancePreChecker.java | 2 +-
.../helix/core/rebalance/RebalancePreChecker.java | 2 +-
.../helix/core/rebalance/TableRebalancer.java | 28 ++++++++++++++--------
3 files changed, 20 insertions(+), 12 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index 5aec4e82158..78da0e58b39 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -344,7 +344,7 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig
rebalanceConfig, TableConfig tableConfig,
Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
- RebalanceSummaryResult rebalanceSummaryResult) {
+ @Nullable RebalanceSummaryResult rebalanceSummaryResult) {
List<String> warnings = new ArrayList<>();
boolean pass = true;
if (rebalanceConfig.isBestEfforts()) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
index 91e80bb497d..b8f4a0f9544 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
@@ -43,7 +43,7 @@ public interface RebalancePreChecker {
public PreCheckContext(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig,
Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
@Nullable TableSizeReader.TableSubTypeSizeDetails
tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig,
- RebalanceSummaryResult rebalanceSummaryResult) {
+ @Nullable RebalanceSummaryResult rebalanceSummaryResult) {
_rebalanceJobId = rebalanceJobId;
_tableNameWithType = tableNameWithType;
_tableConfig = tableConfig;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a292e396ce5..8fd9bede6fc 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -486,7 +486,10 @@ public class TableRebalancer {
List<String> segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove);
- long estimatedAverageSegmentSizeInBytes =
summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes();
+ long estimatedAverageSegmentSizeInBytes = 0;
+ if (summaryResult != null) {
+ estimatedAverageSegmentSizeInBytes =
summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes();
+ }
Set<String> allSegmentsFromIdealState = currentAssignment.keySet();
TableRebalanceObserver.RebalanceContext rebalanceContext = new
TableRebalanceObserver.RebalanceContext(
estimatedAverageSegmentSizeInBytes, allSegmentsFromIdealState,
segmentsToMonitor);
@@ -580,7 +583,13 @@ public class TableRebalancer {
//
// NOTE: Monitor the segments to be moved from both the previous round and
this round to ensure the moved segments
// in the previous round are also converged.
+ List<String> oldSegmentsToMove = segmentsToMove;
while (true) {
+ boolean segmentsToMoveChanged = oldSegmentsToMove != segmentsToMove;
+ if (segmentsToMoveChanged) {
+ oldSegmentsToMove = segmentsToMove;
+ segmentsToMonitor.addAll(segmentsToMove);
+ }
// Wait for ExternalView to converge before updating the next IdealState
IdealState idealState;
try {
@@ -601,6 +610,9 @@ public class TableRebalancer {
"Caught exception while waiting for ExternalView to converge: " +
e, instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
+ if (segmentsToMoveChanged) {
+ segmentsToMonitor = new HashSet<>(segmentsToMove);
+ }
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
@@ -621,7 +633,7 @@ public class TableRebalancer {
// If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
// the same target instance state map for these segments to the new
ideal state as the target assignment
- boolean segmentsToMoveChanged = false;
+ segmentsToMoveChanged = false;
if (segmentAssignment instanceof
BaseStrictRealtimeSegmentAssignment) {
// For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
// for new added segments is based on the existing assignment
@@ -652,6 +664,7 @@ public class TableRebalancer {
minimizeDataMovement, tableRebalanceLogger).getLeft();
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
+ segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
} catch (Exception e) {
onReturnFailure("Caught exception while re-calculating the
target assignment, aborting the rebalance", e,
tableRebalanceLogger);
@@ -773,28 +786,23 @@ public class TableRebalancer {
Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
.set(idealStatePropertyKey.getPath(), idealStateRecord,
expectedVersion, AccessOption.PERSISTENT),
"Failed to update IdealState");
- currentAssignment = nextAssignment;
expectedVersion++;
+ currentAssignment = nextAssignment;
+ segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
// IdealState update is successful. Update the segment list as the
IDEAL_STATE_CHANGE_TRIGGER should have
// captured the newly added / deleted segments
allSegmentsFromIdealState = currentAssignment.keySet();
tableRebalanceLogger.info("Successfully updated the IdealState");
} catch (ZkBadVersionException e) {
tableRebalanceLogger.info("Version changed while updating IdealState");
- // Since IdealState wasn't updated, rollback the stats changes made
and continue. There is no need to update
- // segmentsToMonitor either since that hasn't changed without the
IdealState update
+ // Since IdealState wasn't updated, rollback the stats changes made
_tableRebalanceObserver.onRollback();
- continue;
} catch (Exception e) {
onReturnFailure("Caught exception while updating IdealState, aborting
the rebalance", e, tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult, summaryResult);
}
-
- segmentsToMonitor = new HashSet<>(segmentsToMove);
- segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
- segmentsToMonitor.addAll(segmentsToMove);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]