This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 97652d30d31 Rely on `taskCountMin` in `computeValidTaskCounts`;
correct the embedded test for cost-based-autoscaler (#18963)
97652d30d31 is described below
commit 97652d30d312f6cf56474a0f29156f3d1bb4328b
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Thu Jan 29 16:20:50 2026 +0200
Rely on `taskCountMin` in `computeValidTaskCounts`; correct the embedded
test for cost-based-autoscaler (#18963)
This patch fixes a behaviour where computeValidTaskCounts took care of
upper bound (taskCountMax), but did not care about taskCountMin.
Also it fixes a flaky embedded test.
---
.../CostBasedAutoScalerIntegrationTest.java | 2 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 4 ++-
.../autoscaler/CostBasedAutoScalerTest.java | 34 ++++++++++++++++------
3 files changed, 29 insertions(+), 11 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index ee9712f8f82..ad23e0b6fbe 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -227,7 +227,7 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
// High idle weight ensures scale-down when tasks are mostly idle
(little data to process)
.lagWeight(0.1)
.idleWeight(0.9)
- .scaleDownDuringTaskRolloverOnly(false)
+ .scaleDownDuringTaskRolloverOnly(true)
// Do not slow scale-downs
.scaleDownBarrier(0)
.build();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index dcdeea0ccd0..a7ea833da34 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -215,6 +215,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
partitionCount,
currentTaskCount,
(long) metrics.getAggregateLag(),
+ config.getTaskCountMin(),
config.getTaskCountMax()
);
@@ -276,6 +277,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
int partitionCount,
int currentTaskCount,
double aggregateLag,
+ int taskCountMin,
int taskCountMax
)
{
@@ -302,7 +304,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
- if (taskCount <= taskCountMax) {
+ if (taskCount >= taskCountMin && taskCount <= taskCountMax) {
result.add(taskCount);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 23192900ec0..17c247f24b0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -76,38 +76,54 @@ public class CostBasedAutoScalerTest
public void testComputeValidTaskCounts()
{
// For 100 partitions at 25 tasks (4 partitions/task), valid counts
include 25 and 34
- int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 100);
+ int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 1, 100);
Assert.assertTrue("Should contain the current task count",
contains(validTaskCounts, 25));
Assert.assertTrue("Should contain the next scale-up option",
contains(validTaskCounts, 34));
// Edge cases
- Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 100).length);
- Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 100).length);
+ Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 1, 100).length);
+ Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 1, 100).length);
// Single partition
- int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 100);
+ int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 1, 100);
Assert.assertTrue("Single partition should have at least one valid count",
singlePartition.length > 0);
Assert.assertTrue("Single partition should contain 1",
contains(singlePartition, 1));
// Current exceeds partitions - should still yield valid, deduplicated
options
- int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 100);
+ int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 1, 100);
Assert.assertEquals(2, exceedsPartitions.length);
Assert.assertTrue(contains(exceedsPartitions, 1));
Assert.assertTrue(contains(exceedsPartitions, 2));
// Lag expansion: low lag should not include max, high lag should
- int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30);
+ int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30);
Assert.assertFalse("Low lag should not include max task count",
contains(lowLagCounts, 30));
Assert.assertTrue("Low lag should cap scale up around 4 tasks",
contains(lowLagCounts, 4));
long highAggregateLag = 30L * 500_000L;
- int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30);
+ int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 1,
30);
Assert.assertTrue("High lag should allow scaling to max tasks",
contains(highLagCounts, 30));
// Respects taskCountMax
- int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3);
+ int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3);
Assert.assertTrue("Should include taskCountMax when doable",
contains(cappedCounts, 3));
Assert.assertFalse("Should not exceed taskCountMax",
contains(cappedCounts, 4));
+
+ // Respects taskCountMin - filters out values below the minimum
+ // With partitionCount=100, currentTaskCount=10, the computed range
includes values like 8, 9, 10, 12, 13
+ int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100);
+ Assert.assertFalse("Should not go below taskCountMin",
contains(minCappedCounts, 8));
+ Assert.assertFalse("Should not go below taskCountMin",
contains(minCappedCounts, 9));
+ Assert.assertTrue("Should include values at taskCountMin",
contains(minCappedCounts, 10));
+ Assert.assertTrue("Should include values above taskCountMin",
contains(minCappedCounts, 12));
+
+ // Both bounds applied together
+ int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12);
+ Assert.assertFalse("Should not go below taskCountMin",
contains(bothBounds, 8));
+ Assert.assertFalse("Should not go below taskCountMin",
contains(bothBounds, 9));
+ Assert.assertFalse("Should not exceed taskCountMax", contains(bothBounds,
13));
+ Assert.assertTrue("Should include values at taskCountMin",
contains(bothBounds, 10));
+ Assert.assertTrue("Should include values at taskCountMax",
contains(bothBounds, 12));
}
@Test
@@ -170,7 +186,7 @@ public class CostBasedAutoScalerTest
for (Example example : examples) {
long aggregateLag = example.lagPerPartition * partitionCount;
- int[] validCounts = computeValidTaskCounts(partitionCount,
example.currentTasks, aggregateLag, taskCountMax);
+ int[] validCounts = computeValidTaskCounts(partitionCount,
example.currentTasks, aggregateLag, 1, taskCountMax);
Assert.assertTrue(
"Should include expected task count for current=" +
example.currentTasks + ", lag=" + example.lagPerPartition,
contains(validCounts, example.expectedTasks)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]