somandal commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152596004


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2185,7 +2185,7 @@ private void processBatchesSequentially(List<Set<String>> 
segmentBatchList, Stri
     }
   }
 
-  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit,
+  public void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit,

Review Comment:
   this feels like a very internal implementation detail on how forceCommit 
works in batches internally to me and IMO shouldn't be exposed as public. 
Consider creating a utility function with an appropriate name in a common 
location and perhaps calling that in both code paths instead. Or consider 
utilizing the tracking method of the jobs metadata in ZK (similar to how 
forceCommit job status can be fetched) instead



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);

Review Comment:
   I kind of think we should expose these as rebalance config options in case 
users want to provide their own values for whatever reason as limitations on 
how many to forceCommit at a time etc. Can choose some good defaults and +1 to 
Yash's comment on using constants



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
       @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
       @ApiParam(value = "Whether to update segment target tier as part of the 
rebalance") @DefaultValue("false")
       @QueryParam("updateTargetTier") boolean updateTargetTier,
+      @ApiParam(value = "Do force commit on consuming segments before they are 
rebalanced") @DefaultValue("false")
+      @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,

Review Comment:
   we can perhaps simplify it even more? `forceCommit`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -231,14 +237,18 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
+    boolean forceCommitBeforeMoved =
+        tableConfig.getTableType() == TableType.REALTIME && 
rebalanceConfig.isForceCommitBeforeMoved();
     tableRebalanceLogger.info(
         "Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances: 
{}, "
             + "includeConsuming: {}, bootstrap: {}, downtime: {}, 
minReplicasToKeepUpForNoDowntime: {}, "
             + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, 
batchSizePerServer: {}, "
-            + "externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}",
+            + "externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}, "
+            + "forceCommitBeforeMoved: {}",
         dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, 
downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts, batchSizePerServer,
-        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, 
minimizeDataMovement);
+        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, 
minimizeDataMovement,
+        forceCommitBeforeMoved);

Review Comment:
   let's add validation that this is not enabled for `OFFLINE` or internally 
set this to `false` for `OFFLINE` tables with a warning log. we don't do 
validations for `includeConsuming` either, so I'm okay with the log message and 
internally setting this to false if that's better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to