This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ed0264a54d7 Add configurable option to scale-down during task run time 
for cost-based autoscaler (#18958)
ed0264a54d7 is described below

commit ed0264a54d74b4de318a214c58090756905cfc00
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Jan 28 17:14:37 2026 +0200

    Add configurable option to scale-down during task run time for cost-based 
autoscaler (#18958)
    
    * Add configurable option to scale-down during task run time for cost-based 
autoscaler
    
    * Docs
    
    * Address review comments, compress tests a bit
---
 docs/ingestion/supervisor.md                       |   2 +
 .../CostBasedAutoScalerIntegrationTest.java        |   5 +
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  20 ++-
 .../autoscaler/CostBasedAutoScalerConfig.java      |  70 ++++++--
 .../autoscaler/CostBasedAutoScalerConfigTest.java  |  62 ++++++-
 .../autoscaler/CostBasedAutoScalerMockTest.java    | 140 +++++++++++++--
 .../autoscaler/CostBasedAutoScalerTest.java        | 197 +++++++--------------
 7 files changed, 332 insertions(+), 164 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 26a3aaea71b..b6be49cec9b 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -208,6 +208,8 @@ The following table outlines the configuration properties 
related to the `costBa
 |`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25|
 |`idleWeight`|The weight of extracted poll idle value in cost function. | No | 
0.75 |
 |`defaultProcessingRate`|A planned processing rate per task, required for 
first cost estimations. | No | 1000 |
+|`scaleDownBarrier`| A number of successful scale down attempts which should 
be skipped  to prevent the auto-scaler from scaling down tasks immediately.  | 
No | 5 |
+|`scaleDownDuringTaskRolloverOnly`| Indicates whether task scaling down is 
limited to periods during task rollovers only. | No | False |
 
 The following example shows a supervisor spec with `lagBased` autoscaler:
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index 140c4626d22..ee9712f8f82 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -132,6 +132,8 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         // Weight configuration: strongly favor lag reduction over idle time
         .lagWeight(0.9)
         .idleWeight(0.1)
+        .scaleDownDuringTaskRolloverOnly(false)
+        .scaleDownBarrier(1)
         .build();
 
     final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
@@ -225,6 +227,9 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         // High idle weight ensures scale-down when tasks are mostly idle 
(little data to process)
         .lagWeight(0.1)
         .idleWeight(0.9)
+        .scaleDownDuringTaskRolloverOnly(false)
+        // Do not slow scale-downs
+        .scaleDownBarrier(0)
         .build();
 
     final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 350e2284359..dcdeea0ccd0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -90,6 +90,8 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final WeightedCostFunction costFunction;
   private volatile CostMetrics lastKnownMetrics;
 
+  private int scaleDownCounter = 0;
+
   public CostBasedAutoScaler(
       SeekableStreamSupervisor supervisor,
       CostBasedAutoScalerConfig config,
@@ -148,7 +150,12 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   @Override
   public int computeTaskCountForRollover()
   {
-    return computeOptimalTaskCount(lastKnownMetrics);
+    if (config.isScaleDownOnTaskRolloverOnly()) {
+      return computeOptimalTaskCount(lastKnownMetrics);
+    } else {
+      scaleDownCounter = 0;
+      return -1;
+    }
   }
 
   public int computeTaskCountForScaleAction()
@@ -157,11 +164,18 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
     final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
 
-    // Perform only scale-up actions
+    // Perform scale-up actions; scale-down actions only if configured.
     int taskCount = -1;
     if (optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
-      log.info("New task count [%d] on supervisor [%s]", taskCount, 
supervisorId);
+      scaleDownCounter = 0; // Nullify the scaleDown counter after a 
successful scaleup too.
+      log.info("New task count [%d] on supervisor [%s], scaling up", 
taskCount, supervisorId);
+    } else if (!config.isScaleDownOnTaskRolloverOnly()
+               && optimalTaskCount < currentTaskCount
+               && ++scaleDownCounter >= config.getScaleDownBarrier()) {
+      taskCount = optimalTaskCount;
+      scaleDownCounter = 0;
+      log.info("New task count [%d] on supervisor [%s], scaling down", 
taskCount, supervisorId);
     } else {
       log.info("No scaling required for supervisor [%s]", supervisorId);
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index aba26ba25b5..d835fea5157 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -43,11 +43,12 @@ import java.util.Objects;
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class CostBasedAutoScalerConfig implements AutoScalerConfig
 {
-  private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 15 * 60 * 
1000; // 15 minutes
-  private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS 
= 1200000; // 20 minutes
-  private static final double DEFAULT_LAG_WEIGHT = 0.25;
-  private static final double DEFAULT_IDLE_WEIGHT = 0.75;
-  private static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 
records/sec per task as default
+  static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 
minutes
+  static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60 
* 1000; // 5 minutes
+  static final double DEFAULT_LAG_WEIGHT = 0.25;
+  static final double DEFAULT_IDLE_WEIGHT = 0.75;
+  static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec 
per task as default
+  static final int DEFAULT_SCALE_DOWN_BARRIER = 5; // We delay scale down by 5 
* DEFAULT_SCALE_ACTION_PERIOD_MILLIS
 
   private final boolean enableTaskAutoScaler;
   private final int taskCountMax;
@@ -60,6 +61,18 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   private final double lagWeight;
   private final double idleWeight;
   private final double defaultProcessingRate;
+  /**
+   * Represents the threshold value used to prevent the auto-scaler from 
scaling down tasks immediately,
+   * when the computed cost-based metrics fall below this barrier.
+   * A higher value implies a more conservative scaling down behavior, 
ensuring that tasks
+   * are not prematurely terminated in scenarios of potential workload spikes 
or insufficient cost savings.
+   */
+  private final int scaleDownBarrier;
+  /**
+   * Indicates whether task scaling down is limited to periods during task 
rollovers only.
+   * If set to {@code false}, allows scaling down during normal task run time.
+   */
+  private final boolean scaleDownDuringTaskRolloverOnly;
 
   @JsonCreator
   public CostBasedAutoScalerConfig(
@@ -72,7 +85,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       @Nullable @JsonProperty("scaleActionPeriodMillis") Long 
scaleActionPeriodMillis,
       @Nullable @JsonProperty("lagWeight") Double lagWeight,
       @Nullable @JsonProperty("idleWeight") Double idleWeight,
-      @Nullable @JsonProperty("defaultProcessingRate") Double 
defaultProcessingRate
+      @Nullable @JsonProperty("defaultProcessingRate") Double 
defaultProcessingRate,
+      @Nullable @JsonProperty("scaleDownBarrier") Integer scaleDownBarrier,
+      @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean 
scaleDownDuringTaskRolloverOnly
   )
   {
     this.enableTaskAutoScaler = enableTaskAutoScaler != null ? 
enableTaskAutoScaler : false;
@@ -90,6 +105,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
     this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
     this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate, 
DEFAULT_PROCESSING_RATE);
+    this.scaleDownBarrier = Configs.valueOrDefault(scaleDownBarrier, 
DEFAULT_SCALE_DOWN_BARRIER);
+    this.scaleDownDuringTaskRolloverOnly = 
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
 
     if (this.enableTaskAutoScaler) {
       Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when 
enableTaskAutoScaler is true");
@@ -107,17 +124,16 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     }
     this.taskCountStart = taskCountStart;
 
-    // Validate stopTaskCountRatio
     Preconditions.checkArgument(
         stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && 
stopTaskCountRatio <= 1.0),
         "0.0 < stopTaskCountRatio <= 1.0"
     );
     this.stopTaskCountRatio = stopTaskCountRatio;
 
-    // Validate weights are non-negative
     Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
     Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 
0");
     Preconditions.checkArgument(this.defaultProcessingRate > 0, 
"defaultProcessingRate must be > 0");
+    Preconditions.checkArgument(this.scaleDownBarrier >= 0, "scaleDownBarrier 
must be >= 0");
   }
 
   /**
@@ -196,6 +212,18 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     return defaultProcessingRate;
   }
 
+  @JsonProperty
+  public int getScaleDownBarrier()
+  {
+    return scaleDownBarrier;
+  }
+
+  @JsonProperty("scaleDownDuringTaskRolloverOnly")
+  public boolean isScaleDownOnTaskRolloverOnly()
+  {
+    return scaleDownDuringTaskRolloverOnly;
+  }
+
   @Override
   public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec, ServiceEmitter emitter)
   {
@@ -222,6 +250,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            && Double.compare(that.lagWeight, lagWeight) == 0
            && Double.compare(that.idleWeight, idleWeight) == 0
            && Double.compare(that.defaultProcessingRate, 
defaultProcessingRate) == 0
+           && scaleDownBarrier == that.scaleDownBarrier
+           && scaleDownDuringTaskRolloverOnly == 
that.scaleDownDuringTaskRolloverOnly
            && Objects.equals(taskCountStart, that.taskCountStart)
            && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
   }
@@ -239,7 +269,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
         scaleActionPeriodMillis,
         lagWeight,
         idleWeight,
-        defaultProcessingRate
+        defaultProcessingRate,
+        scaleDownBarrier,
+        scaleDownDuringTaskRolloverOnly
     );
   }
 
@@ -257,6 +289,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            ", lagWeight=" + lagWeight +
            ", idleWeight=" + idleWeight +
            ", defaultProcessingRate=" + defaultProcessingRate +
+           ", scaleDownBarrier=" + scaleDownBarrier +
+           ", scaleDownDuringTaskRolloverOnly=" + 
scaleDownDuringTaskRolloverOnly +
            '}';
   }
 
@@ -276,6 +310,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     private Double lagWeight;
     private Double idleWeight;
     private Double defaultProcessingRate;
+    private Integer scaleDownBarrier;
+    private Boolean scaleDownDuringTaskRolloverOnly;
 
     private Builder()
     {
@@ -341,6 +377,18 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       return this;
     }
 
+    public Builder scaleDownBarrier(int scaleDownBarrier)
+    {
+      this.scaleDownBarrier = scaleDownBarrier;
+      return this;
+    }
+
+    public Builder scaleDownDuringTaskRolloverOnly(boolean 
scaleDownDuringTaskRolloverOnly)
+    {
+      this.scaleDownDuringTaskRolloverOnly = scaleDownDuringTaskRolloverOnly;
+      return this;
+    }
+
     public CostBasedAutoScalerConfig build()
     {
       return new CostBasedAutoScalerConfig(
@@ -353,7 +401,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
           scaleActionPeriodMillis,
           lagWeight,
           idleWeight,
-          defaultProcessingRate
+          defaultProcessingRate,
+          scaleDownBarrier,
+          scaleDownDuringTaskRolloverOnly
       );
     }
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index b585667adcf..364e0d75b80 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -24,6 +24,13 @@ import org.apache.druid.jackson.DefaultObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_PROCESSING_RATE;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_DOWN_BARRIER;
+
 public class CostBasedAutoScalerConfigTest
 {
   private final ObjectMapper mapper = new DefaultObjectMapper();
@@ -42,7 +49,9 @@ public class CostBasedAutoScalerConfigTest
                   + "  \"scaleActionPeriodMillis\": 60000,\n"
                   + "  \"lagWeight\": 0.6,\n"
                   + "  \"idleWeight\": 0.4,\n"
-                  + "  \"distancePenaltyExponent\": 3.0\n"
+                  + "  \"defaultProcessingRate\": 2000.0,\n"
+                  + "  \"scaleDownBarrier\": 10,\n"
+                  + "  \"scaleDownDuringTaskRolloverOnly\": true\n"
                   + "}";
 
     CostBasedAutoScalerConfig config = mapper.readValue(json, 
CostBasedAutoScalerConfig.class);
@@ -56,6 +65,9 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
     Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
     Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+    Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
+    Assert.assertEquals(10, config.getScaleDownBarrier());
+    Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
 
     // Test serialization back to JSON
     String serialized = mapper.writeValueAsString(config);
@@ -81,10 +93,15 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(2, config.getTaskCountMin());
 
     // Check defaults
-    Assert.assertEquals(900000L, config.getScaleActionPeriodMillis());
-    Assert.assertEquals(1200000L, 
config.getMinTriggerScaleActionFrequencyMillis());
-    Assert.assertEquals(0.25, config.getLagWeight(), 0.001);
-    Assert.assertEquals(0.75, config.getIdleWeight(), 0.001);
+    Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS, 
config.getScaleActionPeriodMillis());
+    Assert.assertEquals(DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS, 
config.getMinTriggerScaleActionFrequencyMillis());
+    Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
+    Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
+    Assert.assertEquals(DEFAULT_PROCESSING_RATE, 
config.getDefaultProcessingRate(), 0.001);
+    Assert.assertEquals(DEFAULT_SCALE_DOWN_BARRIER, 
config.getScaleDownBarrier());
+    Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
+    Assert.assertNull(config.getTaskCountStart());
+    Assert.assertNull(config.getStopTaskCountRatio());
   }
 
   @Test
@@ -98,6 +115,9 @@ public class CostBasedAutoScalerConfigTest
     CostBasedAutoScalerConfig config = mapper.readValue(json, 
CostBasedAutoScalerConfig.class);
 
     Assert.assertFalse(config.getEnableTaskAutoScaler());
+    // When disabled, taskCountMax and taskCountMin default to 0
+    Assert.assertEquals(0, config.getTaskCountMax());
+    Assert.assertEquals(0, config.getTaskCountMin());
   }
 
   @Test(expected = RuntimeException.class)
@@ -149,4 +169,36 @@ public class CostBasedAutoScalerConfigTest
                              .enableTaskAutoScaler(true)
                              .build();
   }
+
+  @Test
+  public void testBuilder()
+  {
+    CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
+                                                                 
.taskCountMax(100)
+                                                                 
.taskCountMin(5)
+                                                                 
.taskCountStart(10)
+                                                                 
.enableTaskAutoScaler(true)
+                                                                 
.minTriggerScaleActionFrequencyMillis(600000L)
+                                                                 
.stopTaskCountRatio(0.8)
+                                                                 
.scaleActionPeriodMillis(60000L)
+                                                                 
.lagWeight(0.6)
+                                                                 
.idleWeight(0.4)
+                                                                 
.defaultProcessingRate(2000.0)
+                                                                 
.scaleDownBarrier(10)
+                                                                 
.scaleDownDuringTaskRolloverOnly(true)
+                                                                 .build();
+
+    Assert.assertTrue(config.getEnableTaskAutoScaler());
+    Assert.assertEquals(100, config.getTaskCountMax());
+    Assert.assertEquals(5, config.getTaskCountMin());
+    Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
+    Assert.assertEquals(600000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+    Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
+    Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
+    Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
+    Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+    Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
+    Assert.assertEquals(10, config.getScaleDownBarrier());
+    Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 4147a50163d..becfd4964b4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -77,23 +77,47 @@ public class CostBasedAutoScalerMockTest
   @Test
   public void testScaleUpWhenOptimalGreaterThanCurrent()
   {
-    // Setup: current = 10, optimal should be higher due to high lag and low 
idle
-    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+    // Use config with barrier=2 to test counter reset behavior
+    CostBasedAutoScalerConfig barrierConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.scaleDownBarrier(2)
+                                                                        
.build();
+
+    CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+        mockSupervisor,
+        barrierConfig,
+        mockSpec,
+        mockEmitter
+    ));
 
     int currentTaskCount = 10;
-    int expectedOptimalCount = 17; // Higher than current
+    int scaleUpOptimal = 17;
+    int scaleDownOptimal = 5;
 
-    CostMetrics metrics = createMetrics(5000.0, currentTaskCount, 
PARTITION_COUNT, 0.1);
+    // First, increment the scaleDownCounter by making a scale-down attempt
+    doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+    Assert.assertEquals("Scale-down blocked (counter=1)", -1, 
autoScaler.computeTaskCountForScaleAction());
 
-    
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    // Now trigger scale-up, which should reset the counter
+    doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(currentTaskCount, 5000.0, 0.1);
 
-    int result = autoScaler.computeTaskCountForScaleAction();
-
     Assert.assertEquals(
         "Should return optimal count when it's greater than current 
(scale-up)",
-        expectedOptimalCount,
-        result
+        scaleUpOptimal,
+        autoScaler.computeTaskCountForScaleAction()
+    );
+
+    // Verify counter was reset: scale-down should be blocked again (counter 
starts from 0)
+    doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+    Assert.assertEquals(
+        "Scale-down should be blocked after scale-up reset the counter",
+        -1,
+        autoScaler.computeTaskCountForScaleAction()
     );
   }
 
@@ -116,8 +140,20 @@ public class CostBasedAutoScalerMockTest
   @Test
   public void testScaleDownBlockedReturnsMinusOne()
   {
-    // Scale-down is blocked in computeTaskCountForScaleAction
-    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+    // Use config with barrier=2 to test counter behavior
+    CostBasedAutoScalerConfig barrierConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.scaleDownBarrier(2)
+                                                                        
.build();
+
+    CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+        mockSupervisor,
+        barrierConfig,
+        mockSpec,
+        mockEmitter
+    ));
 
     int currentTaskCount = 50;
     int optimalCount = 30; // Lower than current (scale-down scenario)
@@ -125,12 +161,25 @@ public class CostBasedAutoScalerMockTest
     doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
 
-    int result = autoScaler.computeTaskCountForScaleAction();
+    // First attempt: counter=1, blocked
+    Assert.assertEquals(
+        "Should return -1 when optimal is less than current (scale-down 
blocked, counter=1)",
+        -1,
+        autoScaler.computeTaskCountForScaleAction()
+    );
+
+    // Second attempt: counter=2, succeeds (barrier reached)
+    Assert.assertEquals(
+        "Scale-down should succeed when barrier reached",
+        optimalCount,
+        autoScaler.computeTaskCountForScaleAction()
+    );
 
+    // Verify counter was reset: next scale-down should be blocked again
     Assert.assertEquals(
-        "Should return -1 when optimal is less than current (scale-down 
blocked)",
+        "Scale-down should be blocked after successful scale-down reset the 
counter",
         -1,
-        result
+        autoScaler.computeTaskCountForScaleAction()
     );
   }
 
@@ -257,6 +306,69 @@ public class CostBasedAutoScalerMockTest
     );
   }
 
+  @Test
+  public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled()
+  {
+    CostBasedAutoScalerConfig rolloverOnlyConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                            
.taskCountMax(100)
+                                                                            
.taskCountMin(1)
+                                                                            
.enableTaskAutoScaler(true)
+                                                                            
.scaleDownDuringTaskRolloverOnly(true)
+                                                                            
.scaleDownBarrier(1) // Set the barrier to 1 so it would trigger immediately if 
not blocked
+                                                                            
.build();
+
+    CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+        mockSupervisor,
+        rolloverOnlyConfig,
+        mockSpec,
+        mockEmitter
+    ));
+
+    int currentTaskCount = 50;
+    int optimalCount = 30; // Lower than current (scale-down scenario)
+
+    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+
+    Assert.assertEquals(
+        "Should return -1 when scaleDownDuringTaskRolloverOnly is true",
+        -1,
+        autoScaler.computeTaskCountForScaleAction()
+    );
+  }
+
+  @Test
+  public void 
testScaleDownAllowedDuringRolloverWhenScaleDownOnRolloverOnlyEnabled()
+  {
+    CostBasedAutoScalerConfig rolloverOnlyConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                            
.taskCountMax(100)
+                                                                            
.taskCountMin(1)
+                                                                            
.enableTaskAutoScaler(true)
+                                                                            
.scaleDownDuringTaskRolloverOnly(true)
+                                                                            
.build();
+
+    CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+        mockSupervisor,
+        rolloverOnlyConfig,
+        mockSpec,
+        mockEmitter
+    ));
+
+    int currentTaskCount = 50;
+    int optimalCount = 30;
+
+    // Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
+    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+    autoScaler.computeTaskCountForScaleAction(); // This populates 
lastKnownMetrics
+
+    Assert.assertEquals(
+        "Should scale-down during rollover when 
scaleDownDuringTaskRolloverOnly is true",
+        optimalCount,
+        autoScaler.computeTaskCountForRollover()
+    );
+  }
+
   private void setupMocksForMetricsCollection(int taskCount, double avgLag, 
double pollIdleRatio)
   {
     when(mockSupervisor.computeLagStats()).thenReturn(new LagStats(0, (long) 
avgLag * 2, (long) avgLag));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 54299d915f6..23192900ec0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -94,11 +94,8 @@ public class CostBasedAutoScalerTest
     Assert.assertEquals(2, exceedsPartitions.length);
     Assert.assertTrue(contains(exceedsPartitions, 1));
     Assert.assertTrue(contains(exceedsPartitions, 2));
-  }
 
-  @Test
-  public void testComputeValidTaskCountsLagExpansion()
-  {
+    // Lag expansion: low lag should not include max, high lag should
     int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30);
     Assert.assertFalse("Low lag should not include max task count", 
contains(lowLagCounts, 30));
     Assert.assertTrue("Low lag should cap scale up around 4 tasks", 
contains(lowLagCounts, 4));
@@ -106,12 +103,8 @@ public class CostBasedAutoScalerTest
     long highAggregateLag = 30L * 500_000L;
     int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30);
     Assert.assertTrue("High lag should allow scaling to max tasks", 
contains(highLagCounts, 30));
-  }
 
-  @Test
-  public void testComputeValidTaskCountsRespectsTaskCountMax()
-  {
-    long highAggregateLag = 30L * 500_000L;
+    // Respects taskCountMax
     int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3);
     Assert.assertTrue("Should include taskCountMax when doable", 
contains(cappedCounts, 3));
     Assert.assertFalse("Should not exceed taskCountMax", 
contains(cappedCounts, 4));
@@ -125,6 +118,31 @@ public class CostBasedAutoScalerTest
     double pollIdleRatio = 0.1;
     double avgProcessingRate = 10.0;
 
+    // Create a local autoScaler with taskCountMax matching the test parameters
+    SupervisorSpec mockSpec = Mockito.mock(SupervisorSpec.class);
+    SeekableStreamSupervisor mockSupervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
+    ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class);
+    SeekableStreamSupervisorIOConfig mockIoConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(mockSpec.getId()).thenReturn("test-supervisor");
+    when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+    when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+    CostBasedAutoScalerConfig localConfig = CostBasedAutoScalerConfig.builder()
+                                                                      
.taskCountMax(taskCountMax)
+                                                                      
.taskCountMin(1)
+                                                                      
.enableTaskAutoScaler(true)
+                                                                      
.lagWeight(0.6)
+                                                                      
.idleWeight(0.4)
+                                                                      .build();
+
+    CostBasedAutoScaler localAutoScaler = new CostBasedAutoScaler(
+        mockSupervisor,
+        localConfig,
+        mockSpec,
+        mockEmitter
+    );
+
     class Example
     {
       final int currentTasks;
@@ -165,7 +183,7 @@ public class CostBasedAutoScalerTest
           pollIdleRatio,
           avgProcessingRate
       );
-      int actualOptimal = autoScaler.computeOptimalTaskCount(metrics);
+      int actualOptimal = localAutoScaler.computeOptimalTaskCount(metrics);
       if (actualOptimal == -1) {
         actualOptimal = example.currentTasks;
       }
@@ -193,17 +211,14 @@ public class CostBasedAutoScalerTest
   }
 
   @Test
-  public void testComputeOptimalTaskCountInvalidInputs()
+  public void testComputeOptimalTaskCount()
   {
+    // Invalid inputs return -1
     Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(null));
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(0.0, 10, 0, 0.0)));
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
-  }
 
-  @Test
-  public void testComputeOptimalTaskCountScaling()
-  {
     // High idle (underutilized) - should scale down
     int scaleDownResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
     Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult < 
25);
@@ -236,31 +251,27 @@ public class CostBasedAutoScalerTest
     group.put("task-1", buildTaskStatsWithPollIdle(0.5));
     validStats.put("0", group);
     Assert.assertEquals(0.4, 
CostBasedAutoScaler.extractPollIdleRatio(validStats), 0.0001);
-  }
 
-  @Test
-  public void testExtractPollIdleRatioInvalidTypes()
-  {
-    // Non-map task metric
+    // Invalid types: non-map task metric
     Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
     nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
     Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
 
-    // Empty autoscaler metrics
+    // Invalid types: empty autoscaler metrics
     Map<String, Map<String, Object>> emptyAutoscaler = new HashMap<>();
     Map<String, Object> taskStats1 = new HashMap<>();
     taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new 
HashMap<>());
     emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
     Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
 
-    // Non-map autoscaler metrics
+    // Invalid types: non-map autoscaler metrics
     Map<String, Map<String, Object>> nonMapAutoscaler = new HashMap<>();
     Map<String, Object> taskStats2 = new HashMap<>();
     taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, 
"not-a-map");
     nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
     Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
 
-    // Non-number poll idle ratio
+    // Invalid types: non-number poll idle ratio
     Map<String, Map<String, Object>> nonNumberRatio = new HashMap<>();
     Map<String, Object> taskStats3 = new HashMap<>();
     Map<String, Object> autoscalerMetrics = new HashMap<>();
@@ -289,20 +300,10 @@ public class CostBasedAutoScalerTest
     group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
     validStats.put("0", group);
     Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
-  }
 
-  @Test
-  public void testExtractMovingAverageIntervalFallback()
-  {
-    // 15-minute average is preferred
+    // Interval fallback: 15-minute preferred, then 5-minute, then 1-minute
     Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
-    fifteenMin.put(
-        "0",
-        Collections.singletonMap(
-            "task-0",
-            buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 
1500.0)
-        )
-    );
+    fifteenMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
     Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
 
     // 1-minute as a final fallback
@@ -334,56 +335,21 @@ public class CostBasedAutoScalerTest
     nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
     Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
 
-    // Missing buildSegments
     Map<String, Map<String, Object>> missingBuild = new HashMap<>();
     Map<String, Object> taskStats1 = new HashMap<>();
     taskStats1.put("movingAverages", new HashMap<>());
     missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
     Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
 
-    // Non-map movingAverages
     Map<String, Map<String, Object>> nonMapMA = new HashMap<>();
     Map<String, Object> taskStats2 = new HashMap<>();
     taskStats2.put("movingAverages", "not-a-map");
     nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
     Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
-
-    // Non-map buildSegments
-    Map<String, Map<String, Object>> nonMapBS = new HashMap<>();
-    Map<String, Object> taskStats3 = new HashMap<>();
-    Map<String, Object> movingAverages3 = new HashMap<>();
-    movingAverages3.put(RowIngestionMeters.BUILD_SEGMENTS, "not-a-map");
-    taskStats3.put("movingAverages", movingAverages3);
-    nonMapBS.put("0", Collections.singletonMap("task-0", taskStats3));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapBS), 0.0001);
-
-    // Non-map interval data
-    Map<String, Map<String, Object>> nonMapInterval = new HashMap<>();
-    Map<String, Object> taskStats4 = new HashMap<>();
-    Map<String, Object> movingAverages4 = new HashMap<>();
-    Map<String, Object> buildSegments4 = new HashMap<>();
-    buildSegments4.put(FIFTEEN_MINUTE_NAME, "not-a-map");
-    movingAverages4.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments4);
-    taskStats4.put("movingAverages", movingAverages4);
-    nonMapInterval.put("0", Collections.singletonMap("task-0", taskStats4));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapInterval), 0.0001);
-
-    // Non-number processed rate
-    Map<String, Map<String, Object>> nonNumberRate = new HashMap<>();
-    Map<String, Object> taskStats5 = new HashMap<>();
-    Map<String, Object> movingAverages5 = new HashMap<>();
-    Map<String, Object> buildSegments5 = new HashMap<>();
-    Map<String, Object> fifteenMin = new HashMap<>();
-    fifteenMin.put(RowIngestionMeters.PROCESSED, "not-a-number");
-    buildSegments5.put(FIFTEEN_MINUTE_NAME, fifteenMin);
-    movingAverages5.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments5);
-    taskStats5.put("movingAverages", movingAverages5);
-    nonNumberRate.put("0", Collections.singletonMap("task-0", taskStats5));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonNumberRate), 0.0001);
   }
 
   @Test
-  public void testComputeTaskCountForRolloverReturnsMinusOneWhenSuspended()
+  public void testComputeTaskCountForRolloverAndConfigProperties()
   {
     SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
     SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
@@ -391,73 +357,40 @@ public class CostBasedAutoScalerTest
     SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
     when(spec.getId()).thenReturn("s-up");
-    when(spec.isSuspended()).thenReturn(true);
     when(supervisor.getIoConfig()).thenReturn(ioConfig);
     when(ioConfig.getStream()).thenReturn("stream");
 
-    CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
-                                                             .taskCountMax(10)
-                                                             .taskCountMin(1)
-                                                             
.enableTaskAutoScaler(true)
-                                                             .lagWeight(0.5)
-                                                             .idleWeight(0.5)
-                                                             .build();
-
-    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, 
spec, emitter);
-    Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
-  }
-
-  @Test
-  public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull()
-  {
-    SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
-    SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
-    ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
-    SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
-
-    when(spec.getId()).thenReturn("s-up");
+    // Test config defaults for scaleDownBarrier, defaultProcessingRate, 
scaleDownDuringTaskRolloverOnly
+    CostBasedAutoScalerConfig cfgWithDefaults = 
CostBasedAutoScalerConfig.builder()
+                                                                          
.taskCountMax(10)
+                                                                          
.taskCountMin(1)
+                                                                          
.enableTaskAutoScaler(true)
+                                                                          
.build();
+    Assert.assertEquals(5, cfgWithDefaults.getScaleDownBarrier());
+    Assert.assertEquals(1000.0, cfgWithDefaults.getDefaultProcessingRate(), 
0.001);
+    Assert.assertFalse(cfgWithDefaults.isScaleDownOnTaskRolloverOnly());
+
+    // Test custom config values
+    CostBasedAutoScalerConfig cfgWithCustom = 
CostBasedAutoScalerConfig.builder()
+                                                                        
.taskCountMax(10)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.scaleDownBarrier(10)
+                                                                        
.defaultProcessingRate(5000.0)
+                                                                        
.scaleDownDuringTaskRolloverOnly(true)
+                                                                        
.build();
+    Assert.assertEquals(10, cfgWithCustom.getScaleDownBarrier());
+    Assert.assertEquals(5000.0, cfgWithCustom.getDefaultProcessingRate(), 
0.001);
+    Assert.assertTrue(cfgWithCustom.isScaleDownOnTaskRolloverOnly());
+
+    // computeTaskCountForRollover returns -1 when 
scaleDownDuringTaskRolloverOnly=false (default)
     when(spec.isSuspended()).thenReturn(false);
-    when(supervisor.computeLagStats()).thenReturn(null);
-    when(supervisor.getIoConfig()).thenReturn(ioConfig);
-    when(ioConfig.getStream()).thenReturn("stream");
-
-    CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
-                                                             .taskCountMax(10)
-                                                             .taskCountMin(1)
-                                                             
.enableTaskAutoScaler(true)
-                                                             .lagWeight(0.5)
-                                                             .idleWeight(0.5)
-                                                             .build();
-
-    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, 
spec, emitter);
+    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, 
cfgWithDefaults, spec, emitter);
     Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
-  }
-
-  @Test
-  public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
-  {
-    // Tests the case where the lastKnownMetrics is null (no 
computeTaskCountForScaleAction called)
-    SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
-    SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
-    ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
-    SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
-    when(spec.getId()).thenReturn("s-up");
-    when(spec.isSuspended()).thenReturn(false);
-    when(supervisor.getIoConfig()).thenReturn(ioConfig);
-    when(ioConfig.getStream()).thenReturn("stream");
-
-    CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
-                                                             .taskCountMax(10)
-                                                             .taskCountMin(1)
-                                                             
.enableTaskAutoScaler(true)
-                                                             .lagWeight(0.5)
-                                                             .idleWeight(0.5)
-                                                             .build();
-
-    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, 
spec, emitter);
-    // Should return -1 when lastKnownMetrics is null
-    Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+    // computeTaskCountForRollover returns -1 when lastKnownMetrics is null 
(even with scaleDownDuringTaskRolloverOnly=true)
+    CostBasedAutoScaler scalerWithRolloverOnly = new 
CostBasedAutoScaler(supervisor, cfgWithCustom, spec, emitter);
+    Assert.assertEquals(-1, 
scalerWithRolloverOnly.computeTaskCountForRollover());
   }
 
   private CostMetrics createMetrics(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to