This is an automated email from the ASF dual-hosted git repository.

Fly-Style pushed a commit to branch feat/poll-idle-ratio-replacement
in repository https://gitbox.apache.org/repos/asf/druid.git

commit c9c833a527217a9f8b9e23bebfcccf5aeed4544e
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Jun 24 11:21:54 2026 +0300

    Add integration test
---
 .../CostBasedAutoScalerIntegrationTest.java        | 52 +++++++++++++++++++++-
 1 file changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index 7e63c3f4f25..0c72d07d8e6 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -180,6 +180,54 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  public void 
test_autoScaler_computesOptimalTaskCountAndProducesScaleUp_withUtilizationRatio()
+  {
+
+    final int lowInitialTaskCount = 1;
+    // This ensures tasks are busy processing (low idle ratio)
+    Executors.newSingleThreadExecutor().submit(() -> {
+      for (int i = 0; i < 500; ++i) {
+        publish1kRecords(topic, true);
+      }
+    });
+
+    final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
+        .builder()
+        .enableTaskAutoScaler(true)
+        .taskCountMin(1)
+        .taskCountMax(50)
+        .taskCountStart(lowInitialTaskCount)
+        .scaleActionPeriodMillis(500)
+        .minTriggerScaleActionFrequencyMillis(1000)
+        .lagWeight(0.8)
+        .idleWeight(0.2)
+        .useUtilizationRatio(true)
+        .build();
+
+    final KafkaSupervisorSpec kafkaSupervisorSpec = 
createKafkaSupervisorWithAutoScaler(
+        autoScalerConfig,
+        lowInitialTaskCount
+    );
+
+    Assertions.assertEquals(kafkaSupervisorSpec.getId(), 
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
+
+    // Wait for the supervisor is running
+    overlord.latchableEmitter()
+            .waitForEvent(event -> event.hasMetricName("task/run/time")
+                                        .hasDimension(DruidMetrics.DATASOURCE, 
dataSource));
+
+    // With 50 partitions and high lag saturating the single task's processing 
rate,
+    // the utilization ratio must drive the cost function to recommend scaling 
up.
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                      .hasValueMatching(Matchers.greaterThan(1L))
+    );
+
+    // Suspend the supervisor
+    
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
+  }
+
   @Test
   public void test_autoScaler_scalesUpAndDown_withSlowPublish()
   {
@@ -337,7 +385,9 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
     final String getSupervisorPath = 
StringUtils.format("/druid/indexer/v1/supervisor/%s", supervisorId);
     final KafkaSupervisorSpec supervisorSpec = 
cluster.callApi().serviceClient().onLeaderOverlord(
         mapper -> new RequestBuilder(HttpMethod.GET, getSupervisorPath),
-        new TypeReference<>(){}
+        new TypeReference<>()
+        {
+        }
     );
     Assertions.assertNotNull(supervisorSpec);
     return supervisorSpec.getSpec().getIOConfig().getTaskCount();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to