somandal commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2153381331


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -410,6 +410,11 @@ private RebalancePreCheckerResult 
checkRebalanceConfig(RebalanceConfig rebalance
       }
     }
 
+    if (tableConfig.getTableType() == TableType.OFFLINE && 
rebalanceConfig.isForceCommitBeforeMoved()) {
+      pass = false;
+      warnings.add("forceCommitBeforeMoved is set for OFFLINE table, which 
will be ignored.");

Review Comment:
   let's rename this to match the parameter name



##########
pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerOptions.ts:
##########
@@ -174,5 +174,35 @@ export const rebalanceServerOptions: 
RebalanceServerOption[] = [
         "isAdvancedConfig": true,
         "isStatsGatheringConfig": false,
         "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBeforeMoved",
+        "defaultValue": false,
+        "type": "BOOL",
+        "label": "Force Commit Before Moved",
+        "description": "Do force commit on consuming segments before they are 
rebalanced",
+        "isAdvancedConfig": false,
+        "isStatsGatheringConfig": false,
+        "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBatchSize",
+        "defaultValue": 2147483647,
+        "type": "INTEGER",
+        "label": "Force Commit Batch Size",
+        "description": "If forceCommitBeforeMoved is set, this is the batch 
size for force commit operations. Controls how many segments are force 
committed in each batch. (Default to Integer.MAX to disable batching)",
+        "isAdvancedConfig": true,
+        "isStatsGatheringConfig": false,
+        "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBatchStatusCheckTimeoutMs",

Review Comment:
   wasn't there a 4th parameter that was added?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }

Review Comment:
   yes the rebalance should be marked as failed if this happens for now. we can 
revisit this later if we actually find cases where it is not useful to fail the 
rebalance. in the worst case the user can retry the rebalance with 
forceCommit=false



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -231,14 +237,18 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
+    boolean forceCommitBeforeMoved =
+        tableConfig.getTableType() == TableType.REALTIME && 
rebalanceConfig.isForceCommitBeforeMoved();
     tableRebalanceLogger.info(
         "Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances: 
{}, "
             + "includeConsuming: {}, bootstrap: {}, downtime: {}, 
minReplicasToKeepUpForNoDowntime: {}, "
             + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, 
batchSizePerServer: {}, "
-            + "externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}",
+            + "externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}, "
+            + "forceCommitBeforeMoved: {}",
         dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, 
downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts, batchSizePerServer,
-        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, 
minimizeDataMovement);
+        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, 
minimizeDataMovement,
+        forceCommitBeforeMoved);

Review Comment:
   pre-check is overkill IMO as this flag should be a no-op for OFFLINE tables 
even if enabled. I think just adding a warning log and toggling this to false 
is good enough. we don't warn for includeConsuming in pre-checks either



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {

Review Comment:
   let's keep this consistent everywhere. I believe one of the segment 
assignment utils also does the check in the same way. If we want to simplify, I 
recommend opening a new PR with that change and make it everywhere at one go 
and test it out



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -136,6 +136,22 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "300000")
   private long _retryInitialDelayInMs = 300000L;
 
+  @JsonProperty("forceCommitBeforeMoved")
+  @ApiModelProperty(example = "false")
+  private boolean _forceCommitBeforeMoved = false;

Review Comment:
   let's rename this to be consistent with the API parameter



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -410,6 +410,11 @@ private RebalancePreCheckerResult 
checkRebalanceConfig(RebalanceConfig rebalance
       }
     }
 
+    if (tableConfig.getTableType() == TableType.OFFLINE && 
rebalanceConfig.isForceCommitBeforeMoved()) {
+      pass = false;
+      warnings.add("forceCommitBeforeMoved is set for OFFLINE table, which 
will be ignored.");

Review Comment:
   though, I think adding this to pre-checks is probably overkill. why not just 
set it to `false` internally and instead add a warning log. we don't have a 
similar pre-check as this for `includeConsuming` either, do we?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }
+    } else {
+      tableRebalanceLogger.warn(
+          "PinotLLCRealtimeSegmentManager is not initialized, cannot force 
commit consuming segments");
+    }

Review Comment:
   we have a ticket to move the command to using the controller API. Perhaps 
that change can be made and then this code cleaned as part of that



##########
pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerOptions.ts:
##########
@@ -174,5 +174,35 @@ export const rebalanceServerOptions: 
RebalanceServerOption[] = [
         "isAdvancedConfig": true,
         "isStatsGatheringConfig": false,
         "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBeforeMoved",

Review Comment:
   let's use the updated name here and in the `label`



##########
pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerOptions.ts:
##########
@@ -174,5 +174,35 @@ export const rebalanceServerOptions: 
RebalanceServerOption[] = [
         "isAdvancedConfig": true,
         "isStatsGatheringConfig": false,
         "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBeforeMoved",
+        "defaultValue": false,
+        "type": "BOOL",
+        "label": "Force Commit Before Moved",
+        "description": "Do force commit on consuming segments before they are 
rebalanced",
+        "isAdvancedConfig": false,
+        "isStatsGatheringConfig": false,
+        "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBatchSize",
+        "defaultValue": 2147483647,
+        "type": "INTEGER",
+        "label": "Force Commit Batch Size",
+        "description": "If forceCommitBeforeMoved is set, this is the batch 
size for force commit operations. Controls how many segments are force 
committed in each batch. (Default to Integer.MAX to disable batching)",
+        "isAdvancedConfig": true,
+        "isStatsGatheringConfig": false,
+        "markWithWarningIcon": false
+    },
+    {
+        "name": "forceCommitBatchStatusCheckTimeoutMs",
+        "defaultValue": 180000,
+        "type": "INTEGER",
+        "label": "Force Commit Status Check Timeout (ms)",
+        "description": "If forceCommitBeforeMoved is set, this is the timeout 
in milliseconds for force commit batch status checks. Maximum time to wait for 
force commit operations to complete",

Review Comment:
   just make sure to rename the `forceCommitBeforeMoved` throughout this PR



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
       @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
       @ApiParam(value = "Whether to update segment target tier as part of the 
rebalance") @DefaultValue("false")
       @QueryParam("updateTargetTier") boolean updateTargetTier,
+      @ApiParam(value = "Do force commit on consuming segments before they are 
rebalanced") @DefaultValue("false")
+      @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,

Review Comment:
   I'm okay with `forceCommitConsumingSegments` as well, but definitely rename 
both the parameter name and the internal variable name. let's always keep these 
consistent as far as possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to