uds5501 commented on code in PR #18954:
URL: https://github.com/apache/druid/pull/18954#discussion_r2732594000
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
}
});
+ // Phase 1 of scale-during-rollover: detect and set up.
+ // The taskCount change and re-allocation happen in Phase 2 after all
tasks have stopped.
+ // We respect maxAllowedStops to avoid worker capacity exhaustion -
rollover may take multiple cycles.
+ if (!futures.isEmpty() && taskAutoScaler != null &&
pendingRolloverTaskCount == null) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount !=
getIoConfig().getTaskCount()) {
+ log.info(
+ "Autoscaler recommends scaling to [%d] tasks during rollover for
supervisor[%s]. "
+ + "Setting up pending rollover - will apply after all tasks stop.",
+ rolloverTaskCount, supervisorId
+ );
+
+ // Stop remaining active groups while respecting maxAllowedStops to
avoid
+ // worker capacity exhaustion. Publishing tasks continue consuming
worker slots,
+ // so stopping all at once could leave no capacity for new tasks.
+ int numPendingCompletionTaskGroups =
pendingCompletionTaskGroups.values().stream()
+
.mapToInt(List::size).sum();
+ int availableStops = ioConfig.getMaxAllowedStops() -
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+ int stoppedForRollover = 0;
+ for (Entry<Integer, TaskGroup> entry :
activelyReadingTaskGroups.entrySet()) {
+ Integer groupId = entry.getKey();
+ if (!futureGroupIds.contains(groupId)) {
+ if (stoppedForRollover >= availableStops) {
+ log.info(
+ "Deferring stop of taskGroup[%d] to next cycle -
maxAllowedStops[%d] reached. "
+ + "Publishing tasks: [%d], stopped this cycle: [%d].",
+ groupId, ioConfig.getMaxAllowedStops(),
numPendingCompletionTaskGroups, numStoppedTasks.get()
+ );
+ continue;
+ }
+ log.info(
+ "Stopping taskGroup[%d] for autoscaler rollover to [%d]
tasks.",
+ groupId, rolloverTaskCount
+ );
+ futureGroupIds.add(groupId);
+ futures.add(checkpointTaskGroup(entry.getValue(), true));
+ stoppedForRollover++;
+ }
+ }
+
+ // Set the pending rollover flag - actual change applied in Phase 2
+ // when ALL actively reading task groups have stopped
+ pendingRolloverTaskCount = rolloverTaskCount;
Review Comment:
Is it possible that a scenario happens where the `rolloverTaskCount` is
assigned `x` and the task groups we end up stopping is `y` and `x` > `y` (which
seems possible due to max allowed stops config).
If this happens, during Phase 2, we will end up not applying the change at
all and be stuck in a loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]