yashmayya commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2153656792
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
@QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier,
+ @ApiParam(value = "Do force commit on consuming segments before they are
rebalanced") @DefaultValue("false")
+ @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,
Review Comment:
+1 for renaming both the parameter name and all the internal names.
`forceCommit` also seems okay as long as the description for the parameter is
good. I think `forceCommitConsumingSegments` might better convey the intent to
users who aren't already familiar with what force commits are in the context of
realtime tables and consuming segments.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
+ } else {
+ tableRebalanceLogger.warn(
+ "PinotLLCRealtimeSegmentManager is not initialized, cannot force
commit consuming segments");
+ }
Review Comment:
> Unless the PinotHelixResourceManager have a null
_pinotLLCRealtimeSegmentManager, which is unlikely?
Yeah, this shouldn't be possible.
> we have a ticket to move the command to using the controller API. Perhaps
that change can be made and then this code cleaned as part of that
Yeah sounds good, if we already have a ticket for that we can leave this
part as is for now.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
Review Comment:
> My initiative was that this flag tries force commit to reduce the burden
from the newly spinning up servers, so it's more like an add-on. If this add-on
doesn't work the whole rebalance would just be a regular rebalance.
There could potentially be use cases where the consumption offset lag
introduced by rebalancing consuming segments might not be acceptable and the
force commit config would be used to prevent that. Ignoring force commit
failures / timeouts and proceeding with the rebalance could violate data
freshness SLAs in such cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]