kfaraz commented on code in PR #18860:
URL: https://github.com/apache/druid/pull/18860#discussion_r2639781962


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+/**
+ * Holds the result of a cost computation from {@link 
WeightedCostFunction#computeCost}.
+ * All costs are measured in seconds.
+ */
+public class CostResult

Review Comment:
   Thanks for adding this.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3339,43 +3346,43 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
     final AtomicInteger numStoppedTasks = new AtomicInteger();
     // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing
     activelyReadingTaskGroups.entrySet().stream().sorted(
-            Comparator.comparingLong(

Review Comment:
   Please revert all the formatting changes in this file.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -504,6 +504,11 @@ private boolean 
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
       supervisor = spec.createSupervisor();
       autoscaler = spec.createAutoscaler(supervisor);
 
+      // Wire autoscaler back to supervisor for rollover-based scale-down
+      if (supervisor instanceof SeekableStreamSupervisor && autoscaler != 
null) {

Review Comment:
   It feels weird to first create the auto-scaler and then inject it back into 
the supervisor.
   
   How about we add a `createTaskAutoScaler()` method on `Supervisor` interface 
itself.
   Internally, this method will simply call `spec.createAutoscaler(this)` and 
will initialize its own auto-scaler field if needed.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,6 +223,64 @@ public void 
test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  @Timeout(300)
+  void test_scaleDownDuringTaskRollover()
+  {
+    final String superId = dataSource + "_super";
+    final int initialTaskCount = 10;
+
+    final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
+        .builder()
+        .enableTaskAutoScaler(true)
+        .taskCountMin(1)
+        .taskCountMax(10)
+        .taskCountStart(initialTaskCount)
+        .scaleActionPeriodMillis(2000)
+        .minTriggerScaleActionFrequencyMillis(2000)
+        // High idle weight ensures scale-down when tasks are mostly idle 
(little data to process)
+        .lagWeight(0.1)
+        .idleWeight(0.9)
+        .build();
+
+    final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
+
+    // Submit the supervisor
+    Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+
+    // Wait for at least one task running for the datasource managed by the 
supervisor.
+    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/run/time")
+                                                   
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+    // Wait for autoscaler to emit metric indicating scale-down, it should be 
just less than the current task count.
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                      .hasValueMatching(Matchers.lessThan((long) 
initialTaskCount)));
+
+    // Wait for tasks to complete (first rollover)
+    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/action/success/count"));
+
+    // Wait for the task running for the datasource managed by a supervisor.
+    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/run/time")
+                                                   
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+    // After rollover, verify that the running task count has decreased
+    // The autoscaler should have recommended fewer tasks due to high idle time
+    final int postRolloverRunningTasks = 
cluster.callApi().getTaskCount("running", dataSource);
+
+    Assertions.assertTrue(

Review Comment:
   Thanks for adding this verification.
   
   Can we update the existing scale down test to verify that scaling was 
actually skipped in that case?



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -161,6 +155,20 @@ public void 
test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
     cluster.callApi().postSupervisor(spec.createSuspendedSpec());
   }
 
+  /**
+   * Tests that scale down happen during task rollover via checkTaskDuration().
+   *
+   * <p>Test flow:</p>
+   * <ol>
+   *   <li>Start supervisor with 10 tasks and 50 partitions, minimal data (500 
records)</li>
+   *   <li>Wait for initial tasks to start running</li>
+   *   <li>Wait for the first task rollover to complete (task duration is 8 
seconds)</li>
+   *   <li>Verify that after rollover, fewer tasks are running due to 
cost-based autoscaler (no ingestion at all)</li>
+   * </ol>
+   *
+   * <p>Scale down during rollover is triggered in {@code 
SeekableStreamSupervisor.checkTaskDuration()}
+   * when all task groups have rolled over and the autoscaler recommends a 
lower task count.</p>
+   */

Review Comment:
   I feel we should omit the javadoc. The test itself should be readable enough 
to follow through the details.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -258,7 +324,7 @@ private KafkaSupervisorSpec 
createKafkaSupervisorWithAutoScaler(
             ioConfig -> ioConfig
                 .withConsumerProperties(kafkaServer.consumerProperties())
                 .withTaskCount(taskCount)
-                .withTaskDuration(Seconds.THREE.toPeriod())
+                .withTaskDuration(Seconds.parseSeconds("PT7S").toPeriod())

Review Comment:
   ```suggestion
                   .withTaskDuration(Period.seconds(7))
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -129,46 +136,15 @@ public void reset()
     // No-op.
   }
 
-  private CostMetrics collectMetrics()

Review Comment:
   Nit: Can we retain the original order of methods? It might help clean up the 
patch.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -184,7 +160,7 @@ private CostMetrics collectMetrics()
    *
    * @return optimal task count for scale-up, or -1 if no scaling action needed
    */
-  public int computeOptimalTaskCount(CostMetrics metrics)
+  int computeOptimalTaskCount(CostMetrics metrics, CostComputeMode 
costComputeMode)

Review Comment:
   Nit: Rather than using a `ComputeMode` or even a boolean flag, it would be 
cleaner to just add two new methods, both of which call the 
`computeOptimalTaskCount` method.
   
   ```java
   public int computeTaskCountOnRollover(int currentTaskCount)
   {
       // Perform both scale downs and scale ups
       return computeOptimalTaskCount();
   }
   
   public int computeTaskCountForScaleAction()
   {
       final CostMetrics metrics = collectMetrics();
       final int currentTaskCount = metrics.currentTaskCount();
       final int optimalTaskCount = computeOptimalTaskCount(metrics);
   
       // Perform only scale up actions
       return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1;
   }
   ```
   
   This would significantly simplify the diff and clarify the intent better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to