kfaraz commented on code in PR #18860:
URL: https://github.com/apache/druid/pull/18860#discussion_r2644119398
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3376,7 +3391,6 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
}
}
});
-
Review Comment:
Please retain the newline for clean separation of code.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3338,6 +3352,7 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of
earlier groups, then iterate for processing
+ // Sort task groups by start time to prioritize early termination of
earlier groups, then iterate for processing
Review Comment:
duplicate comment.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that
it has been handled
activelyReadingTaskGroups.remove(groupId);
}
+
+ if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount <
ioConfig.getTaskCount()) {
+ log.info("Cost-based autoscaler recommends scaling down to [%d] tasks
during rollover", rolloverTaskCount);
Review Comment:
```suggestion
log.info("Autoscaler recommends scaling down to [%d] tasks during
rollover.", rolloverTaskCount);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4330,6 +4355,15 @@ public SeekableStreamSupervisorIOConfig getIoConfig()
return ioConfig;
}
+ /**
+ * Sets the autoscaler reference for rollover-based scale-down decisions.
+ * Called by {@link SupervisorManager} after supervisor creation.
+ */
+ public void setTaskAutoScaler(@Nullable SupervisorTaskAutoScaler
taskAutoScaler)
Review Comment:
Is this still needed?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that
it has been handled
activelyReadingTaskGroups.remove(groupId);
}
+
+ if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount <
ioConfig.getTaskCount()) {
Review Comment:
Should we also allow scale up on task rollover?
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -86,6 +87,11 @@ default Boolean isHealthy()
return null; // default implementation for interface compatability;
returning null since true or false is misleading
}
+ default SupervisorTaskAutoScaler createAutoscaler()
+ {
+ return null;
+ }
Review Comment:
It seems a little untidy but the default impl should do the same thing that
the existing impl does, so that we do not break extensions that use
auto-scalers.
```suggestion
default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
{
return spec.createAutoscaler(this);
}
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -129,46 +125,25 @@ public void reset()
// No-op.
}
- private CostMetrics collectMetrics()
+ @Override
+ public int computeTaskCountForRollover()
{
- if (spec.isSuspended()) {
- log.debug("Supervisor [%s] is suspended, skipping a metrics collection",
supervisorId);
- return null;
- }
-
- final LagStats lagStats = supervisor.computeLagStats();
- if (lagStats == null) {
- log.debug("Lag stats unavailable for supervisorId [%s], skipping
collection", supervisorId);
- return null;
- }
-
- final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
- final int partitionCount = supervisor.getPartitionCount();
-
- final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
- final double movingAvgRate = extractMovingAverage(taskStats,
DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
- final double pollIdleRatio = extractPollIdleRatio(taskStats);
+ return computeOptimalTaskCount(collectMetrics());
Review Comment:
I wonder if for this method we shouldn't just reuse the metrics collected in
the last cycle.
Metrics collection may be slow since the supervisor might need to contact
all the running tasks.
This would slow down the task rollover process causing ingestion lag.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -204,50 +179,46 @@ public int computeOptimalTaskCount(CostMetrics metrics)
return -1;
}
- // If idle is already in the ideal range [0.2, 0.6], optimal utilization
has been achieved.
- // No scaling is needed - maintain stability by staying at the current
task count.
- final double currentIdleRatio = metrics.getPollIdleRatio();
- if (currentIdleRatio >= 0 &&
WeightedCostFunction.isIdleInIdealRange(currentIdleRatio)) {
- log.debug(
- "Idle ratio [%.3f] is in ideal range for supervisorId [%s], no
scaling needed",
- currentIdleRatio,
- supervisorId
- );
- return -1;
- }
-
int optimalTaskCount = -1;
- double optimalCost = Double.POSITIVE_INFINITY;
+ CostResult optimalCost = new CostResult();
for (int taskCount : validTaskCounts) {
- double cost = costFunction.computeCost(metrics, taskCount, config);
- log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost);
- if (cost < optimalCost) {
+ CostResult costResult = costFunction.computeCost(metrics, taskCount,
config);
+ double cost = costResult.totalCost();
+ log.debug(
+ "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+ taskCount,
+ cost,
+ costResult.lagCost(),
+ costResult.idleCost()
+ );
+ if (cost < optimalCost.totalCost()) {
optimalTaskCount = taskCount;
- optimalCost = cost;
+ optimalCost = costResult;
}
}
- emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC,
metrics.getAvgPartitionLag()));
- emitter.emit(metricBuilder.setMetric(AVG_IDLE_METRIC,
metrics.getPollIdleRatio()));
emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long)
optimalTaskCount));
+ emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC,
optimalCost.lagCost()));
+ emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC,
optimalCost.idleCost()));
log.debug(
"Cost-based scaling evaluation for supervisorId [%s]: current=%d,
optimal=%d, cost=%.4f, "
+ "avgPartitionLag=%.2f, pollIdleRatio=%.3f",
supervisorId,
metrics.getCurrentTaskCount(),
optimalTaskCount,
- optimalCost,
+ optimalCost.totalCost(),
metrics.getAvgPartitionLag(),
metrics.getPollIdleRatio()
);
+
Review Comment:
Nit: extra newline?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that
it has been handled
activelyReadingTaskGroups.remove(groupId);
}
+
+ if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
Review Comment:
Please move this entire new logic into a new private method and add a short
javadoc to it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]