This is an automated email from the ASF dual-hosted git repository. Fly-Style pushed a commit to branch feat/poll-idle-ratio-replacement in repository https://gitbox.apache.org/repos/asf/druid.git
commit c9c833a527217a9f8b9e23bebfcccf5aeed4544e Author: Sasha Syrotenko <[email protected]> AuthorDate: Wed Jun 24 11:21:54 2026 +0300 Add integration test --- .../CostBasedAutoScalerIntegrationTest.java | 52 +++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java index 7e63c3f4f25..0c72d07d8e6 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java @@ -180,6 +180,54 @@ public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec()); } + @Test + public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp_withUtilizationRatio() + { + + final int lowInitialTaskCount = 1; + // This ensures tasks are busy processing (low idle ratio) + Executors.newSingleThreadExecutor().submit(() -> { + for (int i = 0; i < 500; ++i) { + publish1kRecords(topic, true); + } + }); + + final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig + .builder() + .enableTaskAutoScaler(true) + .taskCountMin(1) + .taskCountMax(50) + .taskCountStart(lowInitialTaskCount) + .scaleActionPeriodMillis(500) + .minTriggerScaleActionFrequencyMillis(1000) + .lagWeight(0.8) + .idleWeight(0.2) + .useUtilizationRatio(true) + .build(); + + final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler( + autoScalerConfig, + lowInitialTaskCount + ); + + Assertions.assertEquals(kafkaSupervisorSpec.getId(), cluster.callApi().postSupervisor(kafkaSupervisorSpec)); + + // Wait for the supervisor is running + overlord.latchableEmitter() + .waitForEvent(event -> event.hasMetricName("task/run/time") + .hasDimension(DruidMetrics.DATASOURCE, dataSource)); + + // With 50 partitions and high lag saturating the single task's processing rate, + // the utilization ratio must drive the cost function to recommend scaling up. + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC) + .hasValueMatching(Matchers.greaterThan(1L)) + ); + + // Suspend the supervisor + cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec()); + } + @Test public void test_autoScaler_scalesUpAndDown_withSlowPublish() { @@ -337,7 +385,9 @@ public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase final String getSupervisorPath = StringUtils.format("/druid/indexer/v1/supervisor/%s", supervisorId); final KafkaSupervisorSpec supervisorSpec = cluster.callApi().serviceClient().onLeaderOverlord( mapper -> new RequestBuilder(HttpMethod.GET, getSupervisorPath), - new TypeReference<>(){} + new TypeReference<>() + { + } ); Assertions.assertNotNull(supervisorSpec); return supervisorSpec.getSpec().getIOConfig().getTaskCount(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
