kfaraz commented on code in PR #18860: URL: https://github.com/apache/druid/pull/18860#discussion_r2639781962
########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +/** + * Holds the result of a cost computation from {@link WeightedCostFunction#computeCost}. + * All costs are measured in seconds. + */ +public class CostResult Review Comment: Thanks for adding this. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -3339,43 +3346,43 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException final AtomicInteger numStoppedTasks = new AtomicInteger(); // Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing activelyReadingTaskGroups.entrySet().stream().sorted( - Comparator.comparingLong( Review Comment: Please revert all the formatting changes in this file. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java: ########## @@ -504,6 +504,11 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe supervisor = spec.createSupervisor(); autoscaler = spec.createAutoscaler(supervisor); + // Wire autoscaler back to supervisor for rollover-based scale-down + if (supervisor instanceof SeekableStreamSupervisor && autoscaler != null) { Review Comment: It feels weird to first create the auto-scaler and then inject it back into the supervisor. How about we add a `createTaskAutoScaler()` method on `Supervisor` interface itself. Internally, this method will simply call `spec.createAutoscaler(this)` and will initialize its own auto-scaler field if needed. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java: ########## @@ -215,6 +223,64 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec()); } + @Test + @Timeout(300) + void test_scaleDownDuringTaskRollover() + { + final String superId = dataSource + "_super"; + final int initialTaskCount = 10; + + final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig + .builder() + .enableTaskAutoScaler(true) + .taskCountMin(1) + .taskCountMax(10) + .taskCountStart(initialTaskCount) + .scaleActionPeriodMillis(2000) + .minTriggerScaleActionFrequencyMillis(2000) + // High idle weight ensures scale-down when tasks are mostly idle (little data to process) + .lagWeight(0.1) + .idleWeight(0.9) + .build(); + + final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount); + + // Submit the supervisor + Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec)); + + // Wait for at least one task running for the datasource managed by the supervisor. + overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time") + .hasDimension(DruidMetrics.DATASOURCE, dataSource)); + + // Wait for autoscaler to emit metric indicating scale-down, it should be just less than the current task count. + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC) + .hasValueMatching(Matchers.lessThan((long) initialTaskCount))); + + // Wait for tasks to complete (first rollover) + overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/action/success/count")); + + // Wait for the task running for the datasource managed by a supervisor. + overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time") + .hasDimension(DruidMetrics.DATASOURCE, dataSource)); + + // After rollover, verify that the running task count has decreased + // The autoscaler should have recommended fewer tasks due to high idle time + final int postRolloverRunningTasks = cluster.callApi().getTaskCount("running", dataSource); + + Assertions.assertTrue( Review Comment: Thanks for adding this verification. Can we update the existing scale down test to verify that scaling was actually skipped in that case? ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java: ########## @@ -161,6 +155,20 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown() cluster.callApi().postSupervisor(spec.createSuspendedSpec()); } + /** + * Tests that scale down happen during task rollover via checkTaskDuration(). + * + * <p>Test flow:</p> + * <ol> + * <li>Start supervisor with 10 tasks and 50 partitions, minimal data (500 records)</li> + * <li>Wait for initial tasks to start running</li> + * <li>Wait for the first task rollover to complete (task duration is 8 seconds)</li> + * <li>Verify that after rollover, fewer tasks are running due to cost-based autoscaler (no ingestion at all)</li> + * </ol> + * + * <p>Scale down during rollover is triggered in {@code SeekableStreamSupervisor.checkTaskDuration()} + * when all task groups have rolled over and the autoscaler recommends a lower task count.</p> + */ Review Comment: I feel we should omit the javadoc. The test itself should be readable enough to follow through the details. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java: ########## @@ -258,7 +324,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler( ioConfig -> ioConfig .withConsumerProperties(kafkaServer.consumerProperties()) .withTaskCount(taskCount) - .withTaskDuration(Seconds.THREE.toPeriod()) + .withTaskDuration(Seconds.parseSeconds("PT7S").toPeriod()) Review Comment: ```suggestion .withTaskDuration(Period.seconds(7)) ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java: ########## @@ -129,46 +136,15 @@ public void reset() // No-op. } - private CostMetrics collectMetrics() Review Comment: Nit: Can we retain the original order of methods? It might help clean up the patch. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java: ########## @@ -184,7 +160,7 @@ private CostMetrics collectMetrics() * * @return optimal task count for scale-up, or -1 if no scaling action needed */ - public int computeOptimalTaskCount(CostMetrics metrics) + int computeOptimalTaskCount(CostMetrics metrics, CostComputeMode costComputeMode) Review Comment: Nit: Rather than using a `ComputeMode` or even a boolean flag, it would be cleaner to just add two new methods, both of which call the `computeOptimalTaskCount` method. ```java public int computeTaskCountOnRollover(int currentTaskCount) { // Perform both scale downs and scale ups return computeOptimalTaskCount(); } public int computeTaskCountForScaleAction() { final CostMetrics metrics = collectMetrics(); final int currentTaskCount = metrics.currentTaskCount(); final int optimalTaskCount = computeOptimalTaskCount(metrics); // Perform only scale up actions return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1; } ``` This would significantly simplify the diff and clarify the intent better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
