somandal commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152596004
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2185,7 +2185,7 @@ private void processBatchesSequentially(List<Set<String>>
segmentBatchList, Stri
}
}
- private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit,
+ public void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit,
Review Comment:
this feels like a very internal implementation detail on how forceCommit
works in batches internally to me and IMO shouldn't be exposed as public.
Consider creating a utility function with an appropriate name in a common
location and perhaps calling that in both code paths instead. Or consider
utilizing the tracking method of the jobs metadata in ZK (similar to how
forceCommit job status can be fetched) instead
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
Review Comment:
I kind of think we should expose these as rebalance config options in case
users want to provide their own values for whatever reason as limitations on
how many to forceCommit at a time etc. Can choose some good defaults and +1 to
Yash's comment on using constants
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
@QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier,
+ @ApiParam(value = "Do force commit on consuming segments before they are
rebalanced") @DefaultValue("false")
+ @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,
Review Comment:
we can perhaps simplify it even more? `forceCommit`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -231,14 +237,18 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
+ boolean forceCommitBeforeMoved =
+ tableConfig.getTableType() == TableType.REALTIME &&
rebalanceConfig.isForceCommitBeforeMoved();
tableRebalanceLogger.info(
"Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances:
{}, "
+ "includeConsuming: {}, bootstrap: {}, downtime: {},
minReplicasToKeepUpForNoDowntime: {}, "
+ "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {},
batchSizePerServer: {}, "
- + "externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}",
+ + "externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}, "
+ + "forceCommitBeforeMoved: {}",
dryRun, preChecks, reassignInstances, includeConsuming, bootstrap,
downtime,
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
lowDiskMode, bestEfforts, batchSizePerServer,
- externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs,
minimizeDataMovement);
+ externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs,
minimizeDataMovement,
+ forceCommitBeforeMoved);
Review Comment:
let's add validation that this is not enabled for `OFFLINE` or internally
set this to `false` for `OFFLINE` tables with a warning log. we don't do
validations for `includeConsuming` either, so I'm okay with the log message and
internally setting this to false if that's better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]