kfaraz commented on code in PR #18860:
URL: https://github.com/apache/druid/pull/18860#discussion_r2644119398


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3376,7 +3391,6 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
             }
           }
         });
-

Review Comment:
   Please retain the newline for clean separation of code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3338,6 +3352,7 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
 
     final AtomicInteger numStoppedTasks = new AtomicInteger();
     // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing
+    // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing

Review Comment:
   duplicate comment.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
       // remove this task group from the list of current task groups now that 
it has been handled
       activelyReadingTaskGroups.remove(groupId);
     }
+
+    if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount < 
ioConfig.getTaskCount()) {
+        log.info("Cost-based autoscaler recommends scaling down to [%d] tasks 
during rollover", rolloverTaskCount);

Review Comment:
   ```suggestion
           log.info("Autoscaler recommends scaling down to [%d] tasks during 
rollover.", rolloverTaskCount);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4330,6 +4355,15 @@ public SeekableStreamSupervisorIOConfig getIoConfig()
     return ioConfig;
   }
 
+  /**
+   * Sets the autoscaler reference for rollover-based scale-down decisions.
+   * Called by {@link SupervisorManager} after supervisor creation.
+   */
+  public void setTaskAutoScaler(@Nullable SupervisorTaskAutoScaler 
taskAutoScaler)

Review Comment:
   Is this still needed?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
       // remove this task group from the list of current task groups now that 
it has been handled
       activelyReadingTaskGroups.remove(groupId);
     }
+
+    if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount < 
ioConfig.getTaskCount()) {

Review Comment:
   Should we also allow scale up on task rollover?



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -86,6 +87,11 @@ default Boolean isHealthy()
     return null; // default implementation for interface compatability; 
returning null since true or false is misleading
   }
 
+  default SupervisorTaskAutoScaler createAutoscaler()
+  {
+    return null;
+  }

Review Comment:
   It seems a little untidy but the default impl should do the same thing that 
the existing impl does, so that we do not break extensions that use 
auto-scalers.
   
   ```suggestion
     default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
     {
       return spec.createAutoscaler(this);
     }
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -129,46 +125,25 @@ public void reset()
     // No-op.
   }
 
-  private CostMetrics collectMetrics()
+  @Override
+  public int computeTaskCountForRollover()
   {
-    if (spec.isSuspended()) {
-      log.debug("Supervisor [%s] is suspended, skipping a metrics collection", 
supervisorId);
-      return null;
-    }
-
-    final LagStats lagStats = supervisor.computeLagStats();
-    if (lagStats == null) {
-      log.debug("Lag stats unavailable for supervisorId [%s], skipping 
collection", supervisorId);
-      return null;
-    }
-
-    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
-    final int partitionCount = supervisor.getPartitionCount();
-
-    final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
-    final double movingAvgRate = extractMovingAverage(taskStats, 
DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
-    final double pollIdleRatio = extractPollIdleRatio(taskStats);
+    return computeOptimalTaskCount(collectMetrics());

Review Comment:
   I wonder if for this method we shouldn't just reuse the metrics collected in 
the last cycle.
   Metrics collection may be slow since the supervisor might need to contact 
all the running tasks.
   This would slow down the task rollover process causing ingestion lag.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -204,50 +179,46 @@ public int computeOptimalTaskCount(CostMetrics metrics)
       return -1;
     }
 
-    // If idle is already in the ideal range [0.2, 0.6], optimal utilization 
has been achieved.
-    // No scaling is needed - maintain stability by staying at the current 
task count.
-    final double currentIdleRatio = metrics.getPollIdleRatio();
-    if (currentIdleRatio >= 0 && 
WeightedCostFunction.isIdleInIdealRange(currentIdleRatio)) {
-      log.debug(
-          "Idle ratio [%.3f] is in ideal range for supervisorId [%s], no 
scaling needed",
-          currentIdleRatio,
-          supervisorId
-      );
-      return -1;
-    }
-
     int optimalTaskCount = -1;
-    double optimalCost = Double.POSITIVE_INFINITY;
+    CostResult optimalCost = new CostResult();
 
     for (int taskCount : validTaskCounts) {
-      double cost = costFunction.computeCost(metrics, taskCount, config);
-      log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost);
-      if (cost < optimalCost) {
+      CostResult costResult = costFunction.computeCost(metrics, taskCount, 
config);
+      double cost = costResult.totalCost();
+      log.debug(
+          "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+          taskCount,
+          cost,
+          costResult.lagCost(),
+          costResult.idleCost()
+      );
+      if (cost < optimalCost.totalCost()) {
         optimalTaskCount = taskCount;
-        optimalCost = cost;
+        optimalCost = costResult;
       }
     }
 
-    emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC, 
metrics.getAvgPartitionLag()));
-    emitter.emit(metricBuilder.setMetric(AVG_IDLE_METRIC, 
metrics.getPollIdleRatio()));
     emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long) 
optimalTaskCount));
+    emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC, 
optimalCost.lagCost()));
+    emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC, 
optimalCost.idleCost()));
 
     log.debug(
         "Cost-based scaling evaluation for supervisorId [%s]: current=%d, 
optimal=%d, cost=%.4f, "
         + "avgPartitionLag=%.2f, pollIdleRatio=%.3f",
         supervisorId,
         metrics.getCurrentTaskCount(),
         optimalTaskCount,
-        optimalCost,
+        optimalCost.totalCost(),
         metrics.getAvgPartitionLag(),
         metrics.getPollIdleRatio()
     );
 
+

Review Comment:
   Nit: extra newline?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3428,6 +3442,17 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
       // remove this task group from the list of current task groups now that 
it has been handled
       activelyReadingTaskGroups.remove(groupId);
     }
+
+    if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {

Review Comment:
   Please move this entire new logic into a new private method and add a short 
javadoc to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to