gianm commented on code in PR #19369:
URL: https://github.com/apache/druid/pull/19369#discussion_r3264221758


##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java:
##########
@@ -19,17 +19,31 @@
 
 package org.apache.druid.indexing.overlord.supervisor.autoscaler;
 
+/**
+ * Task-count auto-scaler driven by a streaming supervisor.
+ * <p>
+ * Scaler return-value contract for {@link #computeTaskCountForRollover()} and 
any
+ * implementation-specific scale-action method:
+ * <ul>
+ *   <li>{@code -1} — error case: metrics unavailable or an answer cannot be 
computed. The
+ *       supervisor will skip scaling and emit a failure metric.</li>
+ *   <li>Otherwise — the scaler's preferred task count, <i>unclamped</i> by 
configured min/max

Review Comment:
   IMO this should require that the task count be at least 1.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java:
##########
@@ -19,17 +19,31 @@
 
 package org.apache.druid.indexing.overlord.supervisor.autoscaler;
 
+/**
+ * Task-count auto-scaler driven by a streaming supervisor.
+ * <p>
+ * Scaler return-value contract for {@link #computeTaskCountForRollover()} and 
any
+ * implementation-specific scale-action method:
+ * <ul>
+ *   <li>{@code -1} — error case: metrics unavailable or an answer cannot be 
computed. The

Review Comment:
   nit: would be nice to define this as a `static int CANNOT_COMPUTE = -1` on 
the interface, so implementations can `return CANNOT_COMPUTE;`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -503,29 +503,56 @@ public void handle()
           final int desiredTaskCount = computeDesiredTaskCount.call();
           final int currentTaskCount = getCurrentTaskCount();
 
-          if (desiredTaskCount <= 0) {
-            return;
-          }
-
           ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
                                                                
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
                                                                
.setDimension(DruidMetrics.DATASOURCE, dataSource)
                                                                
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
 
-          // 1) This should already be handled by the auto-scaler 
implementation, but make sure we catch/record these for auditability
-          if (desiredTaskCount == currentTaskCount) {
+          // Negative return is the scaler's error signal (see 
SupervisorTaskAutoScaler).
+          if (desiredTaskCount < 0) {

Review Comment:
   Why not `<= 0`? Zero tasks seems pathological to me too.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -237,74 +212,31 @@ int computeDesiredTaskCount(List<Long> lags)
     double beyondProportion = beyond * 1.0 / metricsCount;
     double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for supervisor[%s].", beyondProportion,
-        withinProportion, spec.getId()
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
supervisor[%s].",
+        beyondProportion,
+        withinProportion,
+        spec.getId()
     );
 
-    int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
-    int desiredActiveTaskCount;
     final int partitionCount = supervisor.getPartitionCount();
     if (partitionCount <= 0) {
       log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", 
spec.getId());
       return -1;
     }
 
-    final int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
-    final int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
-
-    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
-    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
-    // If that is happening, take the bound and return early.
-    final boolean isTaskCountOutOfBounds = currentActiveTaskCount < 
actualTaskCountMin
-                                           || currentActiveTaskCount > 
actualTaskCountMax;
-    if (isTaskCountOutOfBounds) {
-      currentActiveTaskCount = Math.min(actualTaskCountMax, 
Math.max(actualTaskCountMin, currentActiveTaskCount));
-      return currentActiveTaskCount;
-    }
+    final int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
 
     if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
-      // Do Scale out
-      final int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
-      if (currentActiveTaskCount == actualTaskCountMax) {
-        log.debug(
-            "CurrentActiveTaskCount reached task count Max limit, skipping 
scale out action for supervisor[%s].",
-            spec.getId()
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at max task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
-      } else {
-        desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
-      }
-      return desiredActiveTaskCount;
+      // scale-out: step up from current, capped by partition count 
(scaler-internal constraint)
+      return Math.min(currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep(), partitionCount);
     }
-
     if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
-      // Do Scale in
-      final int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
-      if (currentActiveTaskCount == actualTaskCountMin) {
-        log.debug(
-            "CurrentActiveTaskCount reached task count Min limit[%d], skipping 
scale in action for supervisor[%s].",
-            actualTaskCountMin,
-            spec.getId()
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at min task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
-      } else {
-        desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
-      }
-      return desiredActiveTaskCount;
+      // scale-in: step down from current (the supervisor will clamp to 
taskCountMin)
+      return currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();

Review Comment:
   I think this needs `max(1, ...)` to avoid returning a zero or negative task 
count when `scaleInStep >= currentActiveTaskCount`. The supervisor will clamp 
but not if the returned number hits the "Auto-scaler returned pathological 
taskCount" check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to