J-HowHuang commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152704788
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
+ } else {
+ tableRebalanceLogger.warn(
+ "PinotLLCRealtimeSegmentManager is not initialized, cannot force
commit consuming segments");
+ }
Review Comment:
Right, other than tests `PinotTableRebalancer` is the only call path that
could rebalance without an realtime segment manager. Unless the
`PinotHelixResourceManager` have a null `_pinotLLCRealtimeSegmentManager`,
which is unlikely?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]