This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 97652d30d31 Rely on `taskCountMin` in `computeValidTaskCounts`; 
correct the embedded test for cost-based-autoscaler (#18963)
97652d30d31 is described below

commit 97652d30d312f6cf56474a0f29156f3d1bb4328b
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Thu Jan 29 16:20:50 2026 +0200

    Rely on `taskCountMin` in `computeValidTaskCounts`; correct the embedded 
test for cost-based-autoscaler (#18963)
    
    This patch fixes a behaviour where computeValidTaskCounts took care of 
upper bound (taskCountMax), but did not care about taskCountMin.
    
    Also it fixes a flaky embedded test.
---
 .../CostBasedAutoScalerIntegrationTest.java        |  2 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  4 ++-
 .../autoscaler/CostBasedAutoScalerTest.java        | 34 ++++++++++++++++------
 3 files changed, 29 insertions(+), 11 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index ee9712f8f82..ad23e0b6fbe 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -227,7 +227,7 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         // High idle weight ensures scale-down when tasks are mostly idle 
(little data to process)
         .lagWeight(0.1)
         .idleWeight(0.9)
-        .scaleDownDuringTaskRolloverOnly(false)
+        .scaleDownDuringTaskRolloverOnly(true)
         // Do not slow scale-downs
         .scaleDownBarrier(0)
         .build();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index dcdeea0ccd0..a7ea833da34 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -215,6 +215,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
         partitionCount,
         currentTaskCount,
         (long) metrics.getAggregateLag(),
+        config.getTaskCountMin(),
         config.getTaskCountMax()
     );
 
@@ -276,6 +277,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
       int partitionCount,
       int currentTaskCount,
       double aggregateLag,
+      int taskCountMin,
       int taskCountMax
   )
   {
@@ -302,7 +304,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
-      if (taskCount <= taskCountMax) {
+      if (taskCount >= taskCountMin && taskCount <= taskCountMax) {
         result.add(taskCount);
       }
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 23192900ec0..17c247f24b0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -76,38 +76,54 @@ public class CostBasedAutoScalerTest
   public void testComputeValidTaskCounts()
   {
     // For 100 partitions at 25 tasks (4 partitions/task), valid counts 
include 25 and 34
-    int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 100);
+    int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 1, 100);
     Assert.assertTrue("Should contain the current task count", 
contains(validTaskCounts, 25));
     Assert.assertTrue("Should contain the next scale-up option", 
contains(validTaskCounts, 34));
 
     // Edge cases
-    Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 100).length);
-    Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 100).length);
+    Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 1, 100).length);
+    Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 1, 100).length);
 
     // Single partition
-    int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 100);
+    int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 1, 100);
     Assert.assertTrue("Single partition should have at least one valid count", 
singlePartition.length > 0);
     Assert.assertTrue("Single partition should contain 1", 
contains(singlePartition, 1));
 
     // Current exceeds partitions - should still yield valid, deduplicated 
options
-    int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 100);
+    int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 1, 100);
     Assert.assertEquals(2, exceedsPartitions.length);
     Assert.assertTrue(contains(exceedsPartitions, 1));
     Assert.assertTrue(contains(exceedsPartitions, 2));
 
     // Lag expansion: low lag should not include max, high lag should
-    int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30);
+    int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30);
     Assert.assertFalse("Low lag should not include max task count", 
contains(lowLagCounts, 30));
     Assert.assertTrue("Low lag should cap scale up around 4 tasks", 
contains(lowLagCounts, 4));
 
     long highAggregateLag = 30L * 500_000L;
-    int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30);
+    int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 1, 
30);
     Assert.assertTrue("High lag should allow scaling to max tasks", 
contains(highLagCounts, 30));
 
     // Respects taskCountMax
-    int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3);
+    int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3);
     Assert.assertTrue("Should include taskCountMax when doable", 
contains(cappedCounts, 3));
     Assert.assertFalse("Should not exceed taskCountMax", 
contains(cappedCounts, 4));
+
+    // Respects taskCountMin - filters out values below the minimum
+    // With partitionCount=100, currentTaskCount=10, the computed range 
includes values like 8, 9, 10, 12, 13
+    int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100);
+    Assert.assertFalse("Should not go below taskCountMin", 
contains(minCappedCounts, 8));
+    Assert.assertFalse("Should not go below taskCountMin", 
contains(minCappedCounts, 9));
+    Assert.assertTrue("Should include values at taskCountMin", 
contains(minCappedCounts, 10));
+    Assert.assertTrue("Should include values above taskCountMin", 
contains(minCappedCounts, 12));
+
+    // Both bounds applied together
+    int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12);
+    Assert.assertFalse("Should not go below taskCountMin", 
contains(bothBounds, 8));
+    Assert.assertFalse("Should not go below taskCountMin", 
contains(bothBounds, 9));
+    Assert.assertFalse("Should not exceed taskCountMax", contains(bothBounds, 
13));
+    Assert.assertTrue("Should include values at taskCountMin", 
contains(bothBounds, 10));
+    Assert.assertTrue("Should include values at taskCountMax", 
contains(bothBounds, 12));
   }
 
   @Test
@@ -170,7 +186,7 @@ public class CostBasedAutoScalerTest
 
     for (Example example : examples) {
       long aggregateLag = example.lagPerPartition * partitionCount;
-      int[] validCounts = computeValidTaskCounts(partitionCount, 
example.currentTasks, aggregateLag, taskCountMax);
+      int[] validCounts = computeValidTaskCounts(partitionCount, 
example.currentTasks, aggregateLag, 1, taskCountMax);
       Assert.assertTrue(
           "Should include expected task count for current=" + 
example.currentTasks + ", lag=" + example.lagPerPartition,
           contains(validCounts, example.expectedTasks)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to