This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 896a692090a Replace idle decay with logarithmic lag acceleration, get 
rid of dominant task duration in cost fn (#18991)
896a692090a is described below

commit 896a692090a466b62c5adf55f1cb2dfb0238c1da
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Tue Feb 10 16:40:03 2026 +0200

    Replace idle decay with logarithmic lag acceleration, get rid of dominant 
task duration in cost fn (#18991)
---
 .../autoscaler/WeightedCostFunction.java           |  39 ++---
 .../autoscaler/WeightedCostFunctionTest.java       | 158 ++++++++-------------
 2 files changed, 72 insertions(+), 125 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 01b35242ed8..2fbb365fb99 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -29,12 +29,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 public class WeightedCostFunction
 {
   private static final Logger log = new Logger(WeightedCostFunction.class);
-  /**
-   * The lag severity at which lagBusyFactor reaches 1.0 (full idle 
suppression).
-   * lagSeverity is defined as lagPerPartition / highLagThreshold.
-   * At severity=1 (threshold), lagBusyFactor=0. At severity=MAX, 
lagBusyFactor=1.0.
-   */
-  private static final int LAG_AMPLIFICATION_MAX_SEVERITY = 5;
+  private static final double LAG_AMPLIFICATION_MULTIPLIER = 0.2;
 
   /**
    * Computes cost for a given task count using compute time metrics.
@@ -73,12 +68,19 @@ public class WeightedCostFunction
       }
     } else {
       // Lag recovery time is decreasing by adding tasks and increasing by 
ejecting tasks.
+      // In case of increasing lag, we apply an amplification factor to 
reflect the urgency of addressing lag.
       // Caution: we rely only on the metrics, the real issues may be 
absolutely different, up to hardware failure.
-      lagRecoveryTime = metrics.getAggregateLag() / (proposedTaskCount * 
avgProcessingRate);
+      if (metrics.getAggregateLag() <= 0) {
+        lagRecoveryTime = 0;
+      } else {
+        final double lagPerPartition = metrics.getAggregateLag() / 
metrics.getPartitionCount();
+        final double amplification = Math.max(1.0, 1.0 + 
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
+        lagRecoveryTime = metrics.getAggregateLag() * amplification / 
(proposedTaskCount * avgProcessingRate);
+      }
     }
 
-    final double predictedIdleRatio = estimateIdleRatio(metrics, 
proposedTaskCount, config.getHighLagThreshold());
-    final double idleCost = proposedTaskCount * 
metrics.getTaskDurationSeconds() * predictedIdleRatio;
+    final double predictedIdleRatio = estimateIdleRatio(metrics, 
proposedTaskCount);
+    final double idleCost = proposedTaskCount * predictedIdleRatio;
     final double lagCost = config.getLagWeight() * lagRecoveryTime;
     final double weightedIdleCost = config.getIdleWeight() * idleCost;
     final double cost = lagCost + weightedIdleCost;
@@ -97,17 +99,13 @@ public class WeightedCostFunction
   }
 
   /**
-   * Estimates the idle ratio for a proposed task count.
-   * Includes lag-based adjustment to suppress predicted idle when lag exceeds 
the threshold,
-   * encouraging scale-up when there is real work to do.
-   * The algorithm is adjusted with {@code computeExtraPPTIncrease}, so they 
may work in tandem, when enabled.
+   * Estimates the idle ratio for a proposed task count with linear prediction.
    *
    * @param metrics   current system metrics containing idle ratio and task 
count
    * @param taskCount target task count to estimate an idle ratio for
    * @return estimated idle ratio in range [0.0, 1.0]
    */
-  @SuppressWarnings("ExtractMethodRecommender")
-  private double estimateIdleRatio(CostMetrics metrics, int taskCount, int 
highLagThreshold)
+  private double estimateIdleRatio(CostMetrics metrics, int taskCount)
   {
     final double currentPollIdleRatio = metrics.getPollIdleRatio();
 
@@ -126,17 +124,8 @@ public class WeightedCostFunction
     final double taskRatio = (double) taskCount / currentTaskCount;
     final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - 
busyFraction / taskRatio));
 
-    final double lagPerPartition = metrics.getAggregateLag() / 
metrics.getPartitionCount();
-    double lagBusyFactor = 0.;
-
-    // Lag-amplified idle decay using ln(lagSeverity) / ln(maxSeverity).
-    if (highLagThreshold > 0 && lagPerPartition >= highLagThreshold) {
-      final double lagSeverity = lagPerPartition / highLagThreshold;
-      lagBusyFactor = Math.min(1.0, Math.log(lagSeverity) / 
Math.log(LAG_AMPLIFICATION_MAX_SEVERITY));
-    }
-
     // Clamp to valid range [0, 1]
-    return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor));
+    return Math.max(0.0, linearPrediction);
   }
 
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index c5b2b867e66..48285c6efc4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -84,10 +84,10 @@ public class WeightedCostFunctionTest
   }
 
   @Test
-  public void testLagCostWithMarginalModel()
+  public void testLagCostWithAbsoluteModel()
   {
-    // With lag-only config (no idle penalty), the marginal model is used for 
scale-up:
-    // lagRecoveryTime = aggregateLag / (taskCountDiff * rate)
+    // With lag-only config (no idle penalty), cost uses absolute model:
+    // lagRecoveryTime = aggregateLag / (taskCount * rate)
     CostBasedAutoScalerConfig lagOnlyConfig = 
CostBasedAutoScalerConfig.builder()
                                                                        
.taskCountMax(100)
                                                                        
.taskCountMin(1)
@@ -97,20 +97,19 @@ public class WeightedCostFunctionTest
                                                                        
.defaultProcessingRate(1000.0)
                                                                        
.build();
 
-    // aggregateLag = 100000 * 100 = 10,000,000
+    // aggregateLag = 100000 * 100 = 10,000,000; lagPerPartition = 100,000
     CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
+    double aggregateLag = 100000.0 * 100;
+    double amplification = 1.0 + 0.2 * Math.log(aggregateLag / 100);
 
-    // Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
     double costCurrent = costFunction.computeCost(metrics, 10, 
lagOnlyConfig).totalCost();
-    Assert.assertEquals("Cost of current tasks", 1000., costCurrent, 0.1);
+    Assert.assertEquals("Cost of current tasks", aggregateLag * amplification 
/ (10 * 1000.0), costCurrent, 0.1);
 
-    // Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
     double costUp5 = costFunction.computeCost(metrics, 15, 
lagOnlyConfig).totalCost();
-    Assert.assertEquals("Cost when scaling up by 5", 666.7, costUp5, 0.1);
+    Assert.assertEquals("Cost when scaling up by 5", aggregateLag * 
amplification / (15 * 1000.0), costUp5, 0.1);
 
-    // Scale up by 10 (to 20): marginal model = 10M / (20 * 1000) = 500s
     double costUp10 = costFunction.computeCost(metrics, 20, 
lagOnlyConfig).totalCost();
-    Assert.assertEquals("Cost when scaling up by 10", 500.0, costUp10, 0.01);
+    Assert.assertEquals("Cost when scaling up by 10", aggregateLag * 
amplification / (20 * 1000.0), costUp10, 0.1);
 
     // Adding more tasks reduces lag recovery time
     Assert.assertTrue("Adding more tasks reduces lag cost", costUp10 < 
costUp5);
@@ -257,7 +256,7 @@ public class WeightedCostFunctionTest
     CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
     double costAt2 = costFunction.computeCost(metrics, 2, 
idleOnlyConfig).totalCost();
 
-    // idlenessCost = taskCount * taskDuration * 0.0 (clamped) = 0
+    // idlenessCost = taskCount * 0.0 (clamped) = 0
     Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped 
to 0", 0.0, costAt2, 0.0001);
 
     // Extreme scale-up shouldn't exceed 1.0 for idle ratio
@@ -266,7 +265,6 @@ public class WeightedCostFunctionTest
     // predictedIdle = 1 - 0.9/10 = 1 - 0.09 = 0.91 (within bounds)
     CostMetrics lowIdle = createMetrics(0.0, 10, 100, 0.1);
     double costAt100 = costFunction.computeCost(lowIdle, 100, 
idleOnlyConfig).totalCost();
-    // idlenessCost = 100 * 3600 * 0.91 = 327600
     Assert.assertTrue("Cost should be finite and positive", 
Double.isFinite(costAt100) && costAt100 > 0);
   }
 
@@ -289,118 +287,78 @@ public class WeightedCostFunctionTest
     double cost20 = costFunction.computeCost(missingIdleData, 20, 
idleOnlyConfig).totalCost();
 
     // With missing data, predicted idle = 0.5 for all task counts
-    // idlenessCost at 10 = 10 * 3600 * 0.5 = 18000
-    // idlenessCost at 20 = 20 * 3600 * 0.5 = 36000
-    Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 3600 * 
0.5, cost10, 0.0001);
-    Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 3600 * 
0.5, cost20, 0.0001);
+    // idlenessCost at 10 = 10 * 0.5 = 5
+    // idlenessCost at 20 = 20 * 0.5 = 10
+    Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 0.5, 
cost10, 0.0001);
+    Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 0.5, 
cost20, 0.0001);
   }
 
   @Test
-  public void testLagAmplificationReducesIdleUnderHighLag()
+  public void testLagAmplificationAppliedUnconditionally()
   {
-    CostBasedAutoScalerConfig configWithThreshold = 
CostBasedAutoScalerConfig.builder()
-                                                                        
.taskCountMax(100)
-                                                                        
.taskCountMin(1)
-                                                                        
.enableTaskAutoScaler(true)
-                                                                        
.defaultProcessingRate(1000.0)
-                                                                        
.highLagThreshold(10_000)
-                                                                        
.build();
-
-    int currentTaskCount = 3;
-    int proposedTaskCount = 8;
-    int partitionCount = 30;
-    double pollIdleRatio = 0.1;
-
-    // lowLag (5000) is below threshold, highLag (500000) is well above
-    CostMetrics lowLag = createMetrics(5_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
-    CostMetrics highLag = createMetrics(500_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
-
-    double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount, 
configWithThreshold).totalCost();
-    double highLagCost = costFunction.computeCost(highLag, proposedTaskCount, 
configWithThreshold).totalCost();
-    Assert.assertTrue(
-        "Higher lag should reduce predicted idle more aggressively",
-        lowLagCost > highLagCost
-    );
-  }
+    CostBasedAutoScalerConfig lagOnly = CostBasedAutoScalerConfig.builder()
+                                                                 
.taskCountMax(100)
+                                                                 
.taskCountMin(1)
+                                                                 
.enableTaskAutoScaler(true)
+                                                                 
.lagWeight(1.0)
+                                                                 
.idleWeight(0.0)
+                                                                 
.defaultProcessingRate(1000.0)
+                                                                 .build();
 
-  @Test
-  public void testCustomLagThresholdsAffectCostCalculation()
-  {
-    // Test that custom threshold values change behavior compared to defaults
-    int currentTaskCount = 3;
-    int proposedTaskCount = 8;
-    int partitionCount = 30;
+    int currentTaskCount = 10;
+    int proposedTaskCount = 10;
+    int partitionCount = 10;
     double pollIdleRatio = 0.1;
 
-    // Use lag that exceeds sensitive threshold (10000) but not default (-1, 
disabled)
-    CostMetrics metrics = createMetrics(15_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+    // lagPerPartition = 150 * 10 / 10 = 150, amplification = 1 + 0.2 * ln(150)
+    CostMetrics metrics = createMetrics(150.0, currentTaskCount, 
partitionCount, pollIdleRatio);
 
-    // Default config: highLagThreshold=-1 (disabled), no lag amplification
-    CostBasedAutoScalerConfig defaultConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                        
.taskCountMax(100)
-                                                                        
.taskCountMin(1)
-                                                                        
.enableTaskAutoScaler(true)
-                                                                        
.defaultProcessingRate(1000.0)
-                                                                        
.build();
-
-    // Sensitive config: threshold 10000, lag 15000 > 10000, amplification 
happens
-    CostBasedAutoScalerConfig sensitiveConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                          
.taskCountMax(100)
-                                                                          
.taskCountMin(1)
-                                                                          
.enableTaskAutoScaler(true)
-                                                                          
.defaultProcessingRate(1000.0)
-                                                                          
.highLagThreshold(10000)
-                                                                          
.build();
+    double costWithAmp = costFunction.computeCost(metrics, proposedTaskCount, 
lagOnly).totalCost();
 
-    double defaultCost = costFunction.computeCost(metrics, proposedTaskCount, 
defaultConfig).totalCost();
-    double sensitiveCost = costFunction.computeCost(metrics, 
proposedTaskCount, sensitiveConfig).totalCost();
+    double aggregateLag = 150.0 * partitionCount;
+    double lagPerPartition = aggregateLag / partitionCount;
+    double amplification = 1.0 + 0.2 * Math.log(lagPerPartition);
+    double expected = aggregateLag * amplification / (proposedTaskCount * 
1000.0);
 
-    // With lower thresholds, the same lag triggers more aggressive scaling 
behavior
-    // (higher lagBusyFactor), which results in lower predicted idle and thus 
lower idle cost
-    Assert.assertTrue(
-        "More sensitive thresholds should result in different (lower) cost",
-        sensitiveCost < defaultCost
-    );
+    Assert.assertEquals("Lag amplification should increase lag recovery time", 
expected, costWithAmp, 0.0001);
   }
 
   @Test
-  public void testLnSeverityScalesWithLag()
+  public void testAmplificationGrowsWithLag()
   {
-    // Test that ln_severity lagBusyFactor increases with lag severity,
-    // producing lower idle cost at higher lag.
-    // lagSeverity = lagPerPartition / threshold
-    // lagBusyFactor = min(1.0, ln(lagSeverity) / ln(5))
-    int currentTaskCount = 3;
-    int proposedTaskCount = 8;
+    // Verify that higher lag produces proportionally higher cost due to log 
amplification
+    CostBasedAutoScalerConfig lagOnly = CostBasedAutoScalerConfig.builder()
+                                                                 
.taskCountMax(100)
+                                                                 
.taskCountMin(1)
+                                                                 
.enableTaskAutoScaler(true)
+                                                                 
.lagWeight(1.0)
+                                                                 
.idleWeight(0.0)
+                                                                 
.defaultProcessingRate(1000.0)
+                                                                 .build();
+
+    int currentTaskCount = 10;
+    int proposedTaskCount = 10;
     int partitionCount = 30;
     double pollIdleRatio = 0.1;
 
-    CostBasedAutoScalerConfig customConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(100)
-                                                                       
.taskCountMin(1)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.defaultProcessingRate(1000.0)
-                                                                       
.highLagThreshold(10000)
-                                                                       
.build();
-
-    // Lag exactly at threshold (lagPerPartition = 10000, severity=1.0)
-    // lagBusyFactor = ln(1) / ln(5) = 0
-    CostMetrics atThreshold = createMetrics(10_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+    CostMetrics lowLag = createMetrics(100.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+    CostMetrics highLag = createMetrics(10_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
 
-    // Lag at 5x threshold (lagPerPartition = 50000, severity=5.0)
-    // lagBusyFactor = ln(5) / ln(5) = 1.0
-    CostMetrics atMaxSeverity = createMetrics(50_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+    double lowCost = costFunction.computeCost(lowLag, proposedTaskCount, 
lagOnly).totalCost();
+    double highCost = costFunction.computeCost(highLag, proposedTaskCount, 
lagOnly).totalCost();
 
-    double costAtThreshold = costFunction.computeCost(atThreshold, 
proposedTaskCount, customConfig).totalCost();
-    double costAtMax = costFunction.computeCost(atMaxSeverity, 
proposedTaskCount, customConfig).totalCost();
+    Assert.assertTrue("Higher lag should produce higher cost", highCost > 
lowCost);
 
-    // At max severity, lagBusyFactor=1.0, idle is fully suppressed → lower 
cost
+    // The ratio of costs should be more than the ratio of raw lags (due to 
amplification)
+    double lagRatio = 10_000.0 / 100.0;
+    double costRatio = highCost / lowCost;
     Assert.assertTrue(
-        "Cost at max severity should be lower due to full idle suppression",
-        costAtMax < costAtThreshold
+        "Amplification should make cost grow faster than linear with lag",
+        costRatio > lagRatio
     );
   }
 
+
   private CostMetrics createMetrics(
       double avgPartitionLag,
       int currentTaskCount,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to