yashmayya commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2153656792


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
       @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
       @ApiParam(value = "Whether to update segment target tier as part of the 
rebalance") @DefaultValue("false")
       @QueryParam("updateTargetTier") boolean updateTargetTier,
+      @ApiParam(value = "Do force commit on consuming segments before they are 
rebalanced") @DefaultValue("false")
+      @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,

Review Comment:
   +1 for renaming both the parameter name and all the internal names. 
`forceCommit` also seems okay as long as the description for the parameter is 
good. I think `forceCommitConsumingSegments` might better convey the intent to 
users who aren't already familiar with what force commits are in the context of 
realtime tables and consuming segments.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }
+    } else {
+      tableRebalanceLogger.warn(
+          "PinotLLCRealtimeSegmentManager is not initialized, cannot force 
commit consuming segments");
+    }

Review Comment:
   > Unless the PinotHelixResourceManager have a null 
_pinotLLCRealtimeSegmentManager, which is unlikely?
   
   Yeah, this shouldn't be possible. 
   
   > we have a ticket to move the command to using the controller API. Perhaps 
that change can be made and then this code cleaned as part of that
   
   Yeah sounds good, if we already have a ticket for that we can leave this 
part as is for now.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }

Review Comment:
   > My initiative was that this flag tries force commit to reduce the burden 
from the newly spinning up servers, so it's more like an add-on. If this add-on 
doesn't work the whole rebalance would just be a regular rebalance.
   
   There could potentially be use cases where the consumption offset lag 
introduced by rebalancing consuming segments might not be acceptable and the 
force commit config would be used to prevent that. Ignoring force commit 
failures / timeouts and proceeding with the rebalance could violate data 
freshness SLAs in such cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to