This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 896a692090a Replace idle decay with logarithmic lag acceleration, get
rid of dominant task duration in cost fn (#18991)
896a692090a is described below
commit 896a692090a466b62c5adf55f1cb2dfb0238c1da
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Tue Feb 10 16:40:03 2026 +0200
Replace idle decay with logarithmic lag acceleration, get rid of dominant
task duration in cost fn (#18991)
---
.../autoscaler/WeightedCostFunction.java | 39 ++---
.../autoscaler/WeightedCostFunctionTest.java | 158 ++++++++-------------
2 files changed, 72 insertions(+), 125 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 01b35242ed8..2fbb365fb99 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -29,12 +29,7 @@ import org.apache.druid.java.util.common.logger.Logger;
public class WeightedCostFunction
{
private static final Logger log = new Logger(WeightedCostFunction.class);
- /**
- * The lag severity at which lagBusyFactor reaches 1.0 (full idle
suppression).
- * lagSeverity is defined as lagPerPartition / highLagThreshold.
- * At severity=1 (threshold), lagBusyFactor=0. At severity=MAX,
lagBusyFactor=1.0.
- */
- private static final int LAG_AMPLIFICATION_MAX_SEVERITY = 5;
+ private static final double LAG_AMPLIFICATION_MULTIPLIER = 0.2;
/**
* Computes cost for a given task count using compute time metrics.
@@ -73,12 +68,19 @@ public class WeightedCostFunction
}
} else {
// Lag recovery time is decreasing by adding tasks and increasing by
ejecting tasks.
+ // In case of increasing lag, we apply an amplification factor to
reflect the urgency of addressing lag.
// Caution: we rely only on the metrics, the real issues may be
absolutely different, up to hardware failure.
- lagRecoveryTime = metrics.getAggregateLag() / (proposedTaskCount *
avgProcessingRate);
+ if (metrics.getAggregateLag() <= 0) {
+ lagRecoveryTime = 0;
+ } else {
+ final double lagPerPartition = metrics.getAggregateLag() /
metrics.getPartitionCount();
+ final double amplification = Math.max(1.0, 1.0 +
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
+ lagRecoveryTime = metrics.getAggregateLag() * amplification /
(proposedTaskCount * avgProcessingRate);
+ }
}
- final double predictedIdleRatio = estimateIdleRatio(metrics,
proposedTaskCount, config.getHighLagThreshold());
- final double idleCost = proposedTaskCount *
metrics.getTaskDurationSeconds() * predictedIdleRatio;
+ final double predictedIdleRatio = estimateIdleRatio(metrics,
proposedTaskCount);
+ final double idleCost = proposedTaskCount * predictedIdleRatio;
final double lagCost = config.getLagWeight() * lagRecoveryTime;
final double weightedIdleCost = config.getIdleWeight() * idleCost;
final double cost = lagCost + weightedIdleCost;
@@ -97,17 +99,13 @@ public class WeightedCostFunction
}
/**
- * Estimates the idle ratio for a proposed task count.
- * Includes lag-based adjustment to suppress predicted idle when lag exceeds
the threshold,
- * encouraging scale-up when there is real work to do.
- * The algorithm is adjusted with {@code computeExtraPPTIncrease}, so they
may work in tandem, when enabled.
+ * Estimates the idle ratio for a proposed task count with linear prediction.
*
* @param metrics current system metrics containing idle ratio and task
count
* @param taskCount target task count to estimate an idle ratio for
* @return estimated idle ratio in range [0.0, 1.0]
*/
- @SuppressWarnings("ExtractMethodRecommender")
- private double estimateIdleRatio(CostMetrics metrics, int taskCount, int
highLagThreshold)
+ private double estimateIdleRatio(CostMetrics metrics, int taskCount)
{
final double currentPollIdleRatio = metrics.getPollIdleRatio();
@@ -126,17 +124,8 @@ public class WeightedCostFunction
final double taskRatio = (double) taskCount / currentTaskCount;
final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 -
busyFraction / taskRatio));
- final double lagPerPartition = metrics.getAggregateLag() /
metrics.getPartitionCount();
- double lagBusyFactor = 0.;
-
- // Lag-amplified idle decay using ln(lagSeverity) / ln(maxSeverity).
- if (highLagThreshold > 0 && lagPerPartition >= highLagThreshold) {
- final double lagSeverity = lagPerPartition / highLagThreshold;
- lagBusyFactor = Math.min(1.0, Math.log(lagSeverity) /
Math.log(LAG_AMPLIFICATION_MAX_SEVERITY));
- }
-
// Clamp to valid range [0, 1]
- return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor));
+ return Math.max(0.0, linearPrediction);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index c5b2b867e66..48285c6efc4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -84,10 +84,10 @@ public class WeightedCostFunctionTest
}
@Test
- public void testLagCostWithMarginalModel()
+ public void testLagCostWithAbsoluteModel()
{
- // With lag-only config (no idle penalty), the marginal model is used for
scale-up:
- // lagRecoveryTime = aggregateLag / (taskCountDiff * rate)
+ // With lag-only config (no idle penalty), cost uses absolute model:
+ // lagRecoveryTime = aggregateLag / (taskCount * rate)
CostBasedAutoScalerConfig lagOnlyConfig =
CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(1)
@@ -97,20 +97,19 @@ public class WeightedCostFunctionTest
.defaultProcessingRate(1000.0)
.build();
- // aggregateLag = 100000 * 100 = 10,000,000
+ // aggregateLag = 100000 * 100 = 10,000,000; lagPerPartition = 100,000
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
+ double aggregateLag = 100000.0 * 100;
+ double amplification = 1.0 + 0.2 * Math.log(aggregateLag / 100);
- // Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
double costCurrent = costFunction.computeCost(metrics, 10,
lagOnlyConfig).totalCost();
- Assert.assertEquals("Cost of current tasks", 1000., costCurrent, 0.1);
+ Assert.assertEquals("Cost of current tasks", aggregateLag * amplification
/ (10 * 1000.0), costCurrent, 0.1);
- // Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
double costUp5 = costFunction.computeCost(metrics, 15,
lagOnlyConfig).totalCost();
- Assert.assertEquals("Cost when scaling up by 5", 666.7, costUp5, 0.1);
+ Assert.assertEquals("Cost when scaling up by 5", aggregateLag *
amplification / (15 * 1000.0), costUp5, 0.1);
- // Scale up by 10 (to 20): marginal model = 10M / (20 * 1000) = 500s
double costUp10 = costFunction.computeCost(metrics, 20,
lagOnlyConfig).totalCost();
- Assert.assertEquals("Cost when scaling up by 10", 500.0, costUp10, 0.01);
+ Assert.assertEquals("Cost when scaling up by 10", aggregateLag *
amplification / (20 * 1000.0), costUp10, 0.1);
// Adding more tasks reduces lag recovery time
Assert.assertTrue("Adding more tasks reduces lag cost", costUp10 <
costUp5);
@@ -257,7 +256,7 @@ public class WeightedCostFunctionTest
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
double costAt2 = costFunction.computeCost(metrics, 2,
idleOnlyConfig).totalCost();
- // idlenessCost = taskCount * taskDuration * 0.0 (clamped) = 0
+ // idlenessCost = taskCount * 0.0 (clamped) = 0
Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped
to 0", 0.0, costAt2, 0.0001);
// Extreme scale-up shouldn't exceed 1.0 for idle ratio
@@ -266,7 +265,6 @@ public class WeightedCostFunctionTest
// predictedIdle = 1 - 0.9/10 = 1 - 0.09 = 0.91 (within bounds)
CostMetrics lowIdle = createMetrics(0.0, 10, 100, 0.1);
double costAt100 = costFunction.computeCost(lowIdle, 100,
idleOnlyConfig).totalCost();
- // idlenessCost = 100 * 3600 * 0.91 = 327600
Assert.assertTrue("Cost should be finite and positive",
Double.isFinite(costAt100) && costAt100 > 0);
}
@@ -289,118 +287,78 @@ public class WeightedCostFunctionTest
double cost20 = costFunction.computeCost(missingIdleData, 20,
idleOnlyConfig).totalCost();
// With missing data, predicted idle = 0.5 for all task counts
- // idlenessCost at 10 = 10 * 3600 * 0.5 = 18000
- // idlenessCost at 20 = 20 * 3600 * 0.5 = 36000
- Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 3600 *
0.5, cost10, 0.0001);
- Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 3600 *
0.5, cost20, 0.0001);
+ // idlenessCost at 10 = 10 * 0.5 = 5
+ // idlenessCost at 20 = 20 * 0.5 = 10
+ Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 0.5,
cost10, 0.0001);
+ Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 0.5,
cost20, 0.0001);
}
@Test
- public void testLagAmplificationReducesIdleUnderHighLag()
+ public void testLagAmplificationAppliedUnconditionally()
{
- CostBasedAutoScalerConfig configWithThreshold =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.defaultProcessingRate(1000.0)
-
.highLagThreshold(10_000)
-
.build();
-
- int currentTaskCount = 3;
- int proposedTaskCount = 8;
- int partitionCount = 30;
- double pollIdleRatio = 0.1;
-
- // lowLag (5000) is below threshold, highLag (500000) is well above
- CostMetrics lowLag = createMetrics(5_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
- CostMetrics highLag = createMetrics(500_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
-
- double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount,
configWithThreshold).totalCost();
- double highLagCost = costFunction.computeCost(highLag, proposedTaskCount,
configWithThreshold).totalCost();
- Assert.assertTrue(
- "Higher lag should reduce predicted idle more aggressively",
- lowLagCost > highLagCost
- );
- }
+ CostBasedAutoScalerConfig lagOnly = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.defaultProcessingRate(1000.0)
+ .build();
- @Test
- public void testCustomLagThresholdsAffectCostCalculation()
- {
- // Test that custom threshold values change behavior compared to defaults
- int currentTaskCount = 3;
- int proposedTaskCount = 8;
- int partitionCount = 30;
+ int currentTaskCount = 10;
+ int proposedTaskCount = 10;
+ int partitionCount = 10;
double pollIdleRatio = 0.1;
- // Use lag that exceeds sensitive threshold (10000) but not default (-1,
disabled)
- CostMetrics metrics = createMetrics(15_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+ // lagPerPartition = 150 * 10 / 10 = 150, amplification = 1 + 0.2 * ln(150)
+ CostMetrics metrics = createMetrics(150.0, currentTaskCount,
partitionCount, pollIdleRatio);
- // Default config: highLagThreshold=-1 (disabled), no lag amplification
- CostBasedAutoScalerConfig defaultConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.defaultProcessingRate(1000.0)
-
.build();
-
- // Sensitive config: threshold 10000, lag 15000 > 10000, amplification
happens
- CostBasedAutoScalerConfig sensitiveConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.defaultProcessingRate(1000.0)
-
.highLagThreshold(10000)
-
.build();
+ double costWithAmp = costFunction.computeCost(metrics, proposedTaskCount,
lagOnly).totalCost();
- double defaultCost = costFunction.computeCost(metrics, proposedTaskCount,
defaultConfig).totalCost();
- double sensitiveCost = costFunction.computeCost(metrics,
proposedTaskCount, sensitiveConfig).totalCost();
+ double aggregateLag = 150.0 * partitionCount;
+ double lagPerPartition = aggregateLag / partitionCount;
+ double amplification = 1.0 + 0.2 * Math.log(lagPerPartition);
+ double expected = aggregateLag * amplification / (proposedTaskCount *
1000.0);
- // With lower thresholds, the same lag triggers more aggressive scaling
behavior
- // (higher lagBusyFactor), which results in lower predicted idle and thus
lower idle cost
- Assert.assertTrue(
- "More sensitive thresholds should result in different (lower) cost",
- sensitiveCost < defaultCost
- );
+ Assert.assertEquals("Lag amplification should increase lag recovery time",
expected, costWithAmp, 0.0001);
}
@Test
- public void testLnSeverityScalesWithLag()
+ public void testAmplificationGrowsWithLag()
{
- // Test that ln_severity lagBusyFactor increases with lag severity,
- // producing lower idle cost at higher lag.
- // lagSeverity = lagPerPartition / threshold
- // lagBusyFactor = min(1.0, ln(lagSeverity) / ln(5))
- int currentTaskCount = 3;
- int proposedTaskCount = 8;
+ // Verify that higher lag produces proportionally higher cost due to log
amplification
+ CostBasedAutoScalerConfig lagOnly = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.defaultProcessingRate(1000.0)
+ .build();
+
+ int currentTaskCount = 10;
+ int proposedTaskCount = 10;
int partitionCount = 30;
double pollIdleRatio = 0.1;
- CostBasedAutoScalerConfig customConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.defaultProcessingRate(1000.0)
-
.highLagThreshold(10000)
-
.build();
-
- // Lag exactly at threshold (lagPerPartition = 10000, severity=1.0)
- // lagBusyFactor = ln(1) / ln(5) = 0
- CostMetrics atThreshold = createMetrics(10_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+ CostMetrics lowLag = createMetrics(100.0, currentTaskCount,
partitionCount, pollIdleRatio);
+ CostMetrics highLag = createMetrics(10_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
- // Lag at 5x threshold (lagPerPartition = 50000, severity=5.0)
- // lagBusyFactor = ln(5) / ln(5) = 1.0
- CostMetrics atMaxSeverity = createMetrics(50_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+ double lowCost = costFunction.computeCost(lowLag, proposedTaskCount,
lagOnly).totalCost();
+ double highCost = costFunction.computeCost(highLag, proposedTaskCount,
lagOnly).totalCost();
- double costAtThreshold = costFunction.computeCost(atThreshold,
proposedTaskCount, customConfig).totalCost();
- double costAtMax = costFunction.computeCost(atMaxSeverity,
proposedTaskCount, customConfig).totalCost();
+ Assert.assertTrue("Higher lag should produce higher cost", highCost >
lowCost);
- // At max severity, lagBusyFactor=1.0, idle is fully suppressed → lower
cost
+ // The ratio of costs should be more than the ratio of raw lags (due to
amplification)
+ double lagRatio = 10_000.0 / 100.0;
+ double costRatio = highCost / lowCost;
Assert.assertTrue(
- "Cost at max severity should be lower due to full idle suppression",
- costAtMax < costAtThreshold
+ "Amplification should make cost grow faster than linear with lag",
+ costRatio > lagRatio
);
}
+
private CostMetrics createMetrics(
double avgPartitionLag,
int currentTaskCount,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]