This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 99df32ee9af Adjust costs for burst scaleup during heavy lag for
cost-based autoscaler (#18969)
99df32ee9af is described below
commit 99df32ee9af57ba4189f50850fb6e021e32441f5
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Sat Jan 31 16:45:30 2026 +0200
Adjust costs for burst scaleup during heavy lag for cost-based autoscaler
(#18969)
---
.../supervisor/autoscaler/CostBasedAutoScaler.java | 7 ++++---
.../autoscaler/CostBasedAutoScalerTest.java | 23 +++++++++++-----------
.../autoscaler/WeightedCostFunctionTest.java | 2 +-
3 files changed, 17 insertions(+), 15 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index a7ea833da34..060240014be 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -62,7 +62,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
* This constant helps control the granularity of lag considerations in
scaling decisions,
* ensuring smoother transitions between scaled states and avoiding abrupt
changes in task counts.
*/
- private static final int LAG_STEP = 100_000;
+ private static final int LAG_STEP = 50_000;
/**
* This parameter fine-tunes autoscaling behavior by adding extra flexibility
* when calculating maximum allowable partitions per task in response to lag,
@@ -71,10 +71,10 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
*/
private static final int BASE_RAW_EXTRA = 5;
// Base PPT lag threshold allowing to activate a burst scaleup to eliminate
high lag.
- static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
+ static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 25_000;
// Extra PPT lag threshold allowing activation of even more aggressive
scaleup to eliminate high lag,
// also enabling lag-amplified idle calculation decay in the cost function
(to reduce idle weight).
- static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 100_000;
+ static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
public static final String LAG_COST_METRIC =
"task/autoScaler/costBased/lagCost";
public static final String IDLE_COST_METRIC =
"task/autoScaler/costBased/idleCost";
@@ -172,6 +172,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
log.info("New task count [%d] on supervisor [%s], scaling up",
taskCount, supervisorId);
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& optimalTaskCount < currentTaskCount
+ && optimalTaskCount > 0
&& ++scaleDownCounter >= config.getScaleDownBarrier()) {
taskCount = optimalTaskCount;
scaleDownCounter = 0;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 17c247f24b0..5deae662d75 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -38,6 +38,7 @@ import java.util.Map;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts;
@@ -174,14 +175,14 @@ public class CostBasedAutoScalerTest
}
Example[] examples = new Example[]{
- new Example(3, 50_000L, 8),
- new Example(3, 300_000L, 15),
- new Example(3, 500_000L, 30),
- new Example(10, 100_000L, 15),
- new Example(10, 300_000L, 30),
- new Example(10, 500_000L, 30),
- new Example(20, 500_000L, 30),
- new Example(25, 500_000L, 30)
+ new Example(3, EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 8),
+ new Example(3, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 3, 15),
+ new Example(3, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 5, 30),
+ new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD - 1,
15),
+ new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 3,
30),
+ new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10,
30),
+ new Example(20, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10,
30),
+ new Example(25, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10,
30)
};
for (Example example : examples) {
@@ -217,13 +218,13 @@ public class CostBasedAutoScalerTest
public void testComputeExtraPPTIncrease()
{
// No extra increase below the threshold
- Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
49_000L, 30, 3, 30));
+ Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD - 1, 30, 3, 30));
Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L *
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30));
// More aggressive increase when the lag is high
- Assert.assertEquals(6, computeExtraMaxPartitionsPerTaskIncrease(30L *
300_000L, 30, 3, 30));
+ Assert.assertEquals(8, computeExtraMaxPartitionsPerTaskIncrease(30L *
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 5, 30, 3, 30));
// Zero when on max task count
- Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
500_000L, 30, 30, 30));
+ Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10, 30, 30, 30));
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 416def7e3ab..4f5832915f6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -310,7 +310,7 @@ public class WeightedCostFunctionTest
int partitionCount = 30;
double pollIdleRatio = 0.1;
- CostMetrics lowLag = createMetrics(40_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+ CostMetrics lowLag = createMetrics(5_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
CostMetrics highLag = createMetrics(500_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount,
idleOnlyConfig).totalCost();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]