somandal commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2153381331
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -410,6 +410,11 @@ private RebalancePreCheckerResult
checkRebalanceConfig(RebalanceConfig rebalance
}
}
+ if (tableConfig.getTableType() == TableType.OFFLINE &&
rebalanceConfig.isForceCommitBeforeMoved()) {
+ pass = false;
+ warnings.add("forceCommitBeforeMoved is set for OFFLINE table, which
will be ignored.");
Review Comment:
let's rename this to match the parameter name
##########
pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerOptions.ts:
##########
@@ -174,5 +174,35 @@ export const rebalanceServerOptions:
RebalanceServerOption[] = [
"isAdvancedConfig": true,
"isStatsGatheringConfig": false,
"markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBeforeMoved",
+ "defaultValue": false,
+ "type": "BOOL",
+ "label": "Force Commit Before Moved",
+ "description": "Do force commit on consuming segments before they are
rebalanced",
+ "isAdvancedConfig": false,
+ "isStatsGatheringConfig": false,
+ "markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBatchSize",
+ "defaultValue": 2147483647,
+ "type": "INTEGER",
+ "label": "Force Commit Batch Size",
+ "description": "If forceCommitBeforeMoved is set, this is the batch
size for force commit operations. Controls how many segments are force
committed in each batch. (Default to Integer.MAX to disable batching)",
+ "isAdvancedConfig": true,
+ "isStatsGatheringConfig": false,
+ "markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBatchStatusCheckTimeoutMs",
Review Comment:
wasn't there a 4th parameter that was added?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
Review Comment:
yes the rebalance should be marked as failed if this happens for now. we can
revisit this later if we actually find cases where it is not useful to fail the
rebalance. in the worst case the user can retry the rebalance with
forceCommit=false
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -231,14 +237,18 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
+ boolean forceCommitBeforeMoved =
+ tableConfig.getTableType() == TableType.REALTIME &&
rebalanceConfig.isForceCommitBeforeMoved();
tableRebalanceLogger.info(
"Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances:
{}, "
+ "includeConsuming: {}, bootstrap: {}, downtime: {},
minReplicasToKeepUpForNoDowntime: {}, "
+ "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {},
batchSizePerServer: {}, "
- + "externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}",
+ + "externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}, "
+ + "forceCommitBeforeMoved: {}",
dryRun, preChecks, reassignInstances, includeConsuming, bootstrap,
downtime,
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
lowDiskMode, bestEfforts, batchSizePerServer,
- externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs,
minimizeDataMovement);
+ externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs,
minimizeDataMovement,
+ forceCommitBeforeMoved);
Review Comment:
pre-check is overkill IMO as this flag should be a no-op for OFFLINE tables
even if enabled. I think just adding a warning log and toggling this to false
is good enough. we don't warn for includeConsuming in pre-checks either
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
Review Comment:
let's keep this consistent everywhere. I believe one of the segment
assignment utils also does the check in the same way. If we want to simplify, I
recommend opening a new PR with that change and make it everywhere at one go
and test it out
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -136,6 +136,22 @@ public class RebalanceConfig {
@ApiModelProperty(example = "300000")
private long _retryInitialDelayInMs = 300000L;
+ @JsonProperty("forceCommitBeforeMoved")
+ @ApiModelProperty(example = "false")
+ private boolean _forceCommitBeforeMoved = false;
Review Comment:
let's rename this to be consistent with the API parameter
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -410,6 +410,11 @@ private RebalancePreCheckerResult
checkRebalanceConfig(RebalanceConfig rebalance
}
}
+ if (tableConfig.getTableType() == TableType.OFFLINE &&
rebalanceConfig.isForceCommitBeforeMoved()) {
+ pass = false;
+ warnings.add("forceCommitBeforeMoved is set for OFFLINE table, which
will be ignored.");
Review Comment:
though, I think adding this to pre-checks is probably overkill. why not just
set it to `false` internally and instead add a warning log. we don't have a
similar pre-check as this for `includeConsuming` either, do we?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
+ } else {
+ tableRebalanceLogger.warn(
+ "PinotLLCRealtimeSegmentManager is not initialized, cannot force
commit consuming segments");
+ }
Review Comment:
we have a ticket to move the command to using the controller API. Perhaps
that change can be made and then this code cleaned as part of that
##########
pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerOptions.ts:
##########
@@ -174,5 +174,35 @@ export const rebalanceServerOptions:
RebalanceServerOption[] = [
"isAdvancedConfig": true,
"isStatsGatheringConfig": false,
"markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBeforeMoved",
Review Comment:
let's use the updated name here and in the `label`
##########
pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerOptions.ts:
##########
@@ -174,5 +174,35 @@ export const rebalanceServerOptions:
RebalanceServerOption[] = [
"isAdvancedConfig": true,
"isStatsGatheringConfig": false,
"markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBeforeMoved",
+ "defaultValue": false,
+ "type": "BOOL",
+ "label": "Force Commit Before Moved",
+ "description": "Do force commit on consuming segments before they are
rebalanced",
+ "isAdvancedConfig": false,
+ "isStatsGatheringConfig": false,
+ "markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBatchSize",
+ "defaultValue": 2147483647,
+ "type": "INTEGER",
+ "label": "Force Commit Batch Size",
+ "description": "If forceCommitBeforeMoved is set, this is the batch
size for force commit operations. Controls how many segments are force
committed in each batch. (Default to Integer.MAX to disable batching)",
+ "isAdvancedConfig": true,
+ "isStatsGatheringConfig": false,
+ "markWithWarningIcon": false
+ },
+ {
+ "name": "forceCommitBatchStatusCheckTimeoutMs",
+ "defaultValue": 180000,
+ "type": "INTEGER",
+ "label": "Force Commit Status Check Timeout (ms)",
+ "description": "If forceCommitBeforeMoved is set, this is the timeout
in milliseconds for force commit batch status checks. Maximum time to wait for
force commit operations to complete",
Review Comment:
just make sure to rename the `forceCommitBeforeMoved` throughout this PR
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
@QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier,
+ @ApiParam(value = "Do force commit on consuming segments before they are
rebalanced") @DefaultValue("false")
+ @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,
Review Comment:
I'm okay with `forceCommitConsumingSegments` as well, but definitely rename
both the parameter name and the internal variable name. let's always keep these
consistent as far as possible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]