J-HowHuang commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152734886


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }

Review Comment:
   My initiative was that this flag tries force commit to reduce the burden 
from the newly spinning up servers, so it's more like an add-on. If this add-on 
doesn't work the whole rebalance would just be a regular rebalance.
   
   Your point is also valid too, but I'm not sure how much the rebalance users 
want this force commit to be enforced. @somandal any thoughts on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to