J-HowHuang commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152704788


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }
+    } else {
+      tableRebalanceLogger.warn(
+          "PinotLLCRealtimeSegmentManager is not initialized, cannot force 
commit consuming segments");
+    }

Review Comment:
   Right, other than tests `PinotTableRebalancer` is the only call path that 
could rebalance without an realtime segment manager. Unless the 
`PinotHelixResourceManager` have a null `_pinotLLCRealtimeSegmentManager`, 
which is unlikely?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to