This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ff6fc385d1f Inject new task count calculation during the rollover 
(#18860)
ff6fc385d1f is described below

commit ff6fc385d1f13d9cd5044d989c53775f5553c66c
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Mon Jan 5 15:22:24 2026 +0200

    Inject new task count calculation during the rollover (#18860)
    
    Changes:
    - Scale up/down tasks during task rollover
    - Add class `CostResult`
    - Enhance cost calculation in new auto-scaler
---
 .../CostBasedAutoScalerIntegrationTest.java        |  74 ++-
 .../overlord/supervisor/SupervisorManager.java     |   2 +-
 .../supervisor/SeekableStreamSupervisor.java       |  48 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java | 186 ++++---
 .../supervisor/autoscaler/CostResult.java          |  64 +++
 .../autoscaler/WeightedCostFunction.java           |  25 +-
 .../overlord/supervisor/SupervisorManagerTest.java |  45 +-
 .../SeekableStreamSupervisorSpecTest.java          | 333 ++++++++---
 ...treamSupervisorScaleDuringTaskRolloverTest.java | 612 +++++++++++++++++++++
 .../SeekableStreamSupervisorStateTest.java         |  22 +
 .../autoscaler/CostBasedAutoScalerTest.java        | 340 ++++++++++--
 .../autoscaler/WeightedCostFunctionTest.java       |  54 +-
 .../indexing/overlord/supervisor/Supervisor.java   |   6 +
 .../autoscaler/SupervisorTaskAutoScaler.java       |  11 +
 14 files changed, 1554 insertions(+), 268 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index c5b86688ea1..140c4626d22 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -41,7 +41,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
-import org.joda.time.Seconds;
+import org.joda.time.Period;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -59,6 +59,7 @@ import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.Cos
  * <p>
  * Tests the autoscaler's ability to compute optimal task counts based on 
partition count and cost metrics (lag and idle time).
  */
+@SuppressWarnings("resource")
 public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
 {
   private static final String TOPIC = 
EmbeddedClusterApis.createTestDatasourceName();
@@ -95,18 +96,11 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
       }
     };
 
-    // Increase worker capacity to handle more tasks
     indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
-           .addProperty("druid.worker.capacity", "60");
-
-    overlord.addProperty("druid.indexer.task.default.context", 
"{\"useConcurrentLocks\": true}")
-            .addProperty("druid.manager.segments.useIncrementalCache", 
"ifSynced")
-            .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
-
-    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"ifSynced");
+           .addProperty("druid.worker.capacity", "100");
 
     cluster.useLatchableEmitter()
-           .useDefaultTimeoutForLatchableEmitter(120)
+           .useDefaultTimeoutForLatchableEmitter(60)
            .addServer(coordinator)
            .addServer(overlord)
            .addServer(indexer)
@@ -162,7 +156,6 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
   }
 
   @Test
-  @Timeout(125)
   public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
   {
     final String superId = dataSource + "_super_scaleup";
@@ -215,6 +208,63 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  void test_scaleDownDuringTaskRollover()
+  {
+    final String superId = dataSource + "_super";
+    final int initialTaskCount = 10;
+
+    final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
+        .builder()
+        .enableTaskAutoScaler(true)
+        .taskCountMin(1)
+        .taskCountMax(10)
+        .taskCountStart(initialTaskCount)
+        .scaleActionPeriodMillis(2000)
+        .minTriggerScaleActionFrequencyMillis(2000)
+        // High idle weight ensures scale-down when tasks are mostly idle 
(little data to process)
+        .lagWeight(0.1)
+        .idleWeight(0.9)
+        .build();
+
+    final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
+
+    // Submit the supervisor
+    Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+
+    // Wait for at least one task running for the datasource managed by the 
supervisor.
+    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/run/time")
+                                                   
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+    // Wait for autoscaler to emit metric indicating scale-down, it should be 
just less than the current task count.
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+                      .hasValueMatching(Matchers.lessThan((long) 
initialTaskCount)));
+
+    // Wait for tasks to complete (first rollover)
+    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/action/success/count"));
+
+    // Wait for the task running for the datasource managed by a supervisor.
+    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/run/time")
+                                                   
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+    // After rollover, verify that the running task count has decreased
+    // The autoscaler should have recommended fewer tasks due to high idle time
+    final int postRolloverRunningTasks = 
cluster.callApi().getTaskCount("running", dataSource);
+
+    Assertions.assertTrue(
+        postRolloverRunningTasks < initialTaskCount,
+        StringUtils.format(
+            "Expected running task count to decrease after rollover. Initial: 
%d, After rollover: %d",
+            initialTaskCount,
+            postRolloverRunningTasks
+        )
+    );
+
+    // Suspend the supervisor to clean up
+    cluster.callApi().postSupervisor(spec.createSuspendedSpec());
+  }
+
   private void produceRecordsToKafka(int recordCount, int iterations)
   {
     int recordCountPerSlice = recordCount / iterations;
@@ -258,7 +308,7 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
             ioConfig -> ioConfig
                 .withConsumerProperties(kafkaServer.consumerProperties())
                 .withTaskCount(taskCount)
-                .withTaskDuration(Seconds.THREE.toPeriod())
+                .withTaskDuration(Period.seconds(7))
                 .withAutoScalerConfig(autoScalerConfig)
         )
         .withId(supervisorId)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 21d2a626501..0cd799bed54 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -502,7 +502,7 @@ public class SupervisorManager
     SupervisorTaskAutoScaler autoscaler;
     try {
       supervisor = spec.createSupervisor();
-      autoscaler = spec.createAutoscaler(supervisor);
+      autoscaler = supervisor.createAutoscaler(spec);
 
       supervisor.start();
       if (autoscaler != null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 90adc68c799..c5d3ffe873d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -61,8 +61,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -885,7 +887,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * Tag for identifying this supervisor in thread-names, listeners, etc. tag 
= (type + supervisorId).
-  */
+   */
   private final String supervisorTag;
   private final TaskInfoProvider taskInfoProvider;
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
@@ -926,6 +928,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  /**
+   * Reference to the autoscaler, used for rollover-based scale-down decisions.
+   * Wired by {@link SupervisorManager} after supervisor creation.
+   */
+  private volatile SupervisorTaskAutoScaler taskAutoScaler;
+
   // snapshots latest sequences from the stream to be verified in the next run 
cycle of inactive stream check
   private final Map<PartitionIdType, SequenceOffsetType> 
previousSequencesFromStream = new HashMap<>();
   private long lastActiveTimeMillis;
@@ -1306,7 +1314,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                     if (log.isDebugEnabled()) {
                       log.debug(
                           "Handled notice[%s] from notices queue in [%d] ms, "
-                              + "current notices queue size [%d] for 
supervisor[%s] for datasource[%s].",
+                          + "current notices queue size [%d] for 
supervisor[%s] for datasource[%s].",
                           noticeType, noticeHandleTime.millisElapsed(), 
getNoticesQueueSize(), supervisorId, dataSource
                       );
                     }
@@ -1677,6 +1685,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return limitedParseErrors;
   }
 
+  @Override
+  public SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
+  {
+    this.taskAutoScaler = spec.createAutoscaler(this);
+    return this.taskAutoScaler;
+  }
+
   @VisibleForTesting
   public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
       int taskGroupId,
@@ -3428,6 +3443,29 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       // remove this task group from the list of current task groups now that 
it has been handled
       activelyReadingTaskGroups.remove(groupId);
     }
+
+    maybeScaleDuringTaskRollover();
+  }
+
+  /**
+   * Scales up or down the number of tasks during a task rollover, if 
applicable.
+   * <p>
+   * This method is invoked to determine whether a task count adjustment is 
needed
+   * during a task rollover based on the recommendations from the task 
auto-scaler.
+ */
+  @VisibleForTesting
+  void maybeScaleDuringTaskRollover()
+  {
+    if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0) {
+        log.info("Autoscaler recommends scaling down to [%d] tasks during 
rollover", rolloverTaskCount);
+        changeTaskCountInIOConfig(rolloverTaskCount);
+        // Here force reset the supervisor state to be re-calculated on the 
next iteration of runInternal() call.
+        // This seems the best way to inject task amount recalculation during 
the rollover.
+        clearAllocationInfo();
+      }
+    }
   }
 
   private DateTime computeEarliestTaskStartTime(TaskGroup group)
@@ -4688,9 +4726,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
         // Try emitting lag even with stale metrics provided that none of the 
partitions has negative lag
         final long staleMillis = sequenceLastUpdated == null
-            ? 0
-            : DateTimes.nowUtc().getMillis()
-              - (tuningConfig.getOffsetFetchPeriod().getMillis() + 
sequenceLastUpdated.getMillis());
+                                 ? 0
+                                 : DateTimes.nowUtc().getMillis()
+                                   - 
(tuningConfig.getOffsetFetchPeriod().getMillis() + 
sequenceLastUpdated.getMillis());
         if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x 
< 0)) {
           // Log at most once every twenty supervisor runs to reduce noise in 
the logs
           if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) 
{
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9ce08f93660..14e3ca2cf5e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -33,10 +33,9 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -58,8 +57,8 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
   private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
   private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
-  public static final String AVG_LAG_METRIC = 
"task/autoScaler/costBased/avgLag";
-  public static final String AVG_IDLE_METRIC = 
"task/autoScaler/costBased/pollIdleAvg";
+  public static final String LAG_COST_METRIC = 
"task/autoScaler/costBased/lagCost";
+  public static final String IDLE_COST_METRIC = 
"task/autoScaler/costBased/idleCost";
   public static final String OPTIMAL_TASK_COUNT_METRIC = 
"task/autoScaler/costBased/optimalTaskCount";
 
   private final String supervisorId;
@@ -70,6 +69,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final ServiceMetricEvent.Builder metricBuilder;
   private final ScheduledExecutorService autoscalerExecutor;
   private final WeightedCostFunction costFunction;
+  private volatile CostMetrics lastKnownMetrics;
 
   public CostBasedAutoScaler(
       SeekableStreamSupervisor supervisor,
@@ -86,7 +86,8 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     this.costFunction = new WeightedCostFunction();
 
-    this.autoscalerExecutor = 
Execs.scheduledSingleThreaded("CostBasedAutoScaler-" + 
StringUtils.encodeForFormat(spec.getId()));
+    this.autoscalerExecutor = 
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
+                                                            + 
StringUtils.encodeForFormat(spec.getId()));
     this.metricBuilder = ServiceMetricEvent.builder()
                                            
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
                                            .setDimension(
@@ -98,12 +99,8 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   @Override
   public void start()
   {
-    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(this.collectMetrics());
-    Runnable onSuccessfulScale = () -> {
-    };
-
     autoscalerExecutor.scheduleAtFixedRate(
-        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        
supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () 
-> {}, emitter),
         config.getScaleActionPeriodMillis(),
         config.getScaleActionPeriodMillis(),
         TimeUnit.MILLISECONDS
@@ -129,46 +126,25 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     // No-op.
   }
 
-  private CostMetrics collectMetrics()
+  @Override
+  public int computeTaskCountForRollover()
   {
-    if (spec.isSuspended()) {
-      log.debug("Supervisor [%s] is suspended, skipping a metrics collection", 
supervisorId);
-      return null;
-    }
-
-    final LagStats lagStats = supervisor.computeLagStats();
-    if (lagStats == null) {
-      log.debug("Lag stats unavailable for supervisorId [%s], skipping 
collection", supervisorId);
-      return null;
-    }
-
-    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
-    final int partitionCount = supervisor.getPartitionCount();
-
-    final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
-    final double movingAvgRate = extractMovingAverage(taskStats, 
DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
-    final double pollIdleRatio = extractPollIdleRatio(taskStats);
+    return computeOptimalTaskCount(lastKnownMetrics);
+  }
 
-    final double avgPartitionLag = lagStats.getAvgLag();
+  public int computeTaskCountForScaleAction()
+  {
+    lastKnownMetrics = collectMetrics();
+    final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
+    final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
 
-    // Use an actual 15-minute moving average processing rate if available
-    final double avgProcessingRate;
-    if (movingAvgRate > 0) {
-      avgProcessingRate = movingAvgRate;
-    } else {
-      // Fallback: estimate processing rate based on idle ratio
-      final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
-      avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
-    }
+    // Perform only scale-up actions
+    return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1;
+  }
 
-    return new CostMetrics(
-        avgPartitionLag,
-        currentTaskCount,
-        partitionCount,
-        pollIdleRatio,
-        supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
-        avgProcessingRate
-    );
+  public CostBasedAutoScalerConfig getConfig()
+  {
+    return config;
   }
 
   /**
@@ -184,7 +160,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
    *
    * @return optimal task count for scale-up, or -1 if no scaling action needed
    */
-  public int computeOptimalTaskCount(CostMetrics metrics)
+  int computeOptimalTaskCount(CostMetrics metrics)
   {
     if (metrics == null) {
       log.debug("No metrics available yet for supervisorId [%s]", 
supervisorId);
@@ -204,33 +180,28 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
       return -1;
     }
 
-    // If idle is already in the ideal range [0.2, 0.6], optimal utilization 
has been achieved.
-    // No scaling is needed - maintain stability by staying at the current 
task count.
-    final double currentIdleRatio = metrics.getPollIdleRatio();
-    if (currentIdleRatio >= 0 && 
WeightedCostFunction.isIdleInIdealRange(currentIdleRatio)) {
-      log.debug(
-          "Idle ratio [%.3f] is in ideal range for supervisorId [%s], no 
scaling needed",
-          currentIdleRatio,
-          supervisorId
-      );
-      return -1;
-    }
-
     int optimalTaskCount = -1;
-    double optimalCost = Double.POSITIVE_INFINITY;
+    CostResult optimalCost = new CostResult();
 
     for (int taskCount : validTaskCounts) {
-      double cost = costFunction.computeCost(metrics, taskCount, config);
-      log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost);
-      if (cost < optimalCost) {
+      CostResult costResult = costFunction.computeCost(metrics, taskCount, 
config);
+      double cost = costResult.totalCost();
+      log.debug(
+          "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+          taskCount,
+          cost,
+          costResult.lagCost(),
+          costResult.idleCost()
+      );
+      if (cost < optimalCost.totalCost()) {
         optimalTaskCount = taskCount;
-        optimalCost = cost;
+        optimalCost = costResult;
       }
     }
 
-    emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC, 
metrics.getAvgPartitionLag()));
-    emitter.emit(metricBuilder.setMetric(AVG_IDLE_METRIC, 
metrics.getPollIdleRatio()));
     emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long) 
optimalTaskCount));
+    emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC, 
optimalCost.lagCost()));
+    emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC, 
optimalCost.idleCost()));
 
     log.debug(
         "Cost-based scaling evaluation for supervisorId [%s]: current=%d, 
optimal=%d, cost=%.4f, "
@@ -238,7 +209,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
         supervisorId,
         metrics.getCurrentTaskCount(),
         optimalTaskCount,
-        optimalCost,
+        optimalCost.totalCost(),
         metrics.getAvgPartitionLag(),
         metrics.getPollIdleRatio()
     );
@@ -246,8 +217,8 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     if (optimalTaskCount == currentTaskCount) {
       return -1;
     }
-    // Perform both scale-up and scale-down proactively
-    // Future versions may perform scale-down on task rollover only
+
+    // Scale up is performed eagerly.
     return optimalTaskCount;
   }
 
@@ -264,26 +235,22 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
       return new int[]{};
     }
 
-    List<Integer> result = new ArrayList<>();
+    Set<Integer> result = new HashSet<>();
     final int currentPartitionsPerTask = partitionCount / currentTaskCount;
     // Minimum partitions per task correspond to the maximum number of tasks 
(scale up) and vice versa.
     final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
-    final int maxPartitionsPerTask = Math.min(partitionCount, 
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK);
+    final int maxPartitionsPerTask = Math.min(
+        partitionCount,
+        currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
+    );
 
     for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
-      if (result.isEmpty() || result.get(result.size() - 1) != taskCount) {
-        result.add(taskCount);
-      }
+      result.add(taskCount);
     }
     return result.stream().mapToInt(Integer::intValue).toArray();
   }
 
-  public CostBasedAutoScalerConfig getConfig()
-  {
-    return config;
-  }
-
   /**
    * Extracts the average poll-idle-ratio metric from task stats.
    * This metric indicates how much time the consumer spends idle waiting for 
data.
@@ -324,9 +291,9 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
    *
    * @param taskStats the stats map from supervisor.getStats()
    * @return the average 15-minute processing rate across all tasks in 
records/second,
-   *         or -1 if no valid metrics are available
+   * or -1 if no valid metrics are available
    */
-  static double extractMovingAverage(Map<String, Map<String, Object>> 
taskStats, String movingAveragePeriodKey)
+  static double extractMovingAverage(Map<String, Map<String, Object>> 
taskStats)
   {
     if (taskStats == null || taskStats.isEmpty()) {
       return -1;
@@ -341,9 +308,15 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
           if (movingAveragesObj instanceof Map) {
             Object buildSegmentsObj = ((Map<?, ?>) 
movingAveragesObj).get(RowIngestionMeters.BUILD_SEGMENTS);
             if (buildSegmentsObj instanceof Map) {
-              Object fifteenMinObj = ((Map<?, ?>) 
buildSegmentsObj).get(movingAveragePeriodKey);
-              if (fifteenMinObj instanceof Map) {
-                Object processedRate = ((Map<?, ?>) 
fifteenMinObj).get(RowIngestionMeters.PROCESSED);
+              Object movingAvgObj = ((Map<?, ?>) 
buildSegmentsObj).get(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME);
+              if (movingAvgObj == null) {
+                movingAvgObj = ((Map<?, ?>) 
buildSegmentsObj).get(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME);
+                if (movingAvgObj == null) {
+                  movingAvgObj = ((Map<?, ?>) 
buildSegmentsObj).get(DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
+                }
+              }
+              if (movingAvgObj instanceof Map) {
+                Object processedRate = ((Map<?, ?>) 
movingAvgObj).get(RowIngestionMeters.PROCESSED);
                 if (processedRate instanceof Number) {
                   sum += ((Number) processedRate).doubleValue();
                   count++;
@@ -357,4 +330,47 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     return count > 0 ? sum / count : -1;
   }
+
+  private CostMetrics collectMetrics()
+  {
+    if (spec.isSuspended()) {
+      log.debug("Supervisor [%s] is suspended, skipping a metrics collection", 
supervisorId);
+      return null;
+    }
+
+    final LagStats lagStats = supervisor.computeLagStats();
+    if (lagStats == null) {
+      log.debug("Lag stats unavailable for supervisorId [%s], skipping 
collection", supervisorId);
+      return null;
+    }
+
+    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+    final int partitionCount = supervisor.getPartitionCount();
+
+    final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
+    final double movingAvgRate = extractMovingAverage(taskStats);
+    final double pollIdleRatio = extractPollIdleRatio(taskStats);
+
+    final double avgPartitionLag = lagStats.getAvgLag();
+
+    // Use an actual 15-minute moving average processing rate if available
+    final double avgProcessingRate;
+    if (movingAvgRate > 0) {
+      avgProcessingRate = movingAvgRate;
+    } else {
+      // Fallback: estimate processing rate based on the idle ratio
+      final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
+      avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
+    }
+
+    return new CostMetrics(
+        avgPartitionLag,
+        currentTaskCount,
+        partitionCount,
+        pollIdleRatio,
+        supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
+        avgProcessingRate
+    );
+  }
+
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
new file mode 100644
index 00000000000..42096dd61e1
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+/**
+ * Holds the result of a cost computation from {@link 
WeightedCostFunction#computeCost}.
+ * All costs are measured in seconds.
+ */
+public class CostResult
+{
+
+  private final double totalCost;
+  private final double lagCost;
+  private final double idleCost;
+
+  public CostResult()
+  {
+    this(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY);
+  }
+
+  /**
+   * @param totalCost the weighted sum of lagCost and idleCost
+   * @param lagCost   the weighted cost representing expected time (seconds) 
to recover current lag
+   * @param idleCost  the weighted cost representing total compute time 
(seconds) wasted being idle per task duration
+   */
+  public CostResult(double totalCost, double lagCost, double idleCost)
+  {
+    this.totalCost = totalCost;
+    this.lagCost = lagCost;
+    this.idleCost = idleCost;
+  }
+
+  public double totalCost()
+  {
+    return totalCost;
+  }
+
+  public double lagCost()
+  {
+    return lagCost;
+  }
+
+  public double idleCost()
+  {
+    return idleCost;
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 1af8233527a..0da733ef9e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -58,14 +58,15 @@ public class WeightedCostFunction
    * </ul>
    * <p>
    * Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}.
-   * This approach directly connects costs to operational metricsю
+   * This approach directly connects costs to operational metrics.
    *
-   * @return cost score in seconds, or {@link Double#POSITIVE_INFINITY} for 
invalid inputs
+   * @return CostResult containing totalCost, lagCost, and idleCost,
+   * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
    */
-  public double computeCost(CostMetrics metrics, int proposedTaskCount, 
CostBasedAutoScalerConfig config)
+  public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, 
CostBasedAutoScalerConfig config)
   {
     if (metrics == null || config == null || proposedTaskCount <= 0 || 
metrics.getPartitionCount() <= 0) {
-      return Double.POSITIVE_INFINITY;
+      return new CostResult(Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
     }
 
     final double avgProcessingRate = metrics.getAvgProcessingRate();
@@ -74,9 +75,9 @@ public class WeightedCostFunction
       // Metrics are unavailable - favor maintaining the current task count.
       // We're conservative about scale up, but won't let an unlikey scale 
down to happen.
       if (proposedTaskCount == metrics.getCurrentTaskCount()) {
-        return 0.01d;
+        return new CostResult(0.01d, 0.0, 0.0);
       } else {
-        return Double.POSITIVE_INFINITY;
+        return new CostResult(Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
       }
     } else {
       // Lag recovery time is decreasing by adding tasks and increasing by 
ejecting tasks.
@@ -86,19 +87,21 @@ public class WeightedCostFunction
 
     final double predictedIdleRatio = estimateIdleRatio(metrics, 
proposedTaskCount);
     final double idleCost = proposedTaskCount * 
metrics.getTaskDurationSeconds() * predictedIdleRatio;
-    final double cost = config.getLagWeight() * lagRecoveryTime + 
config.getIdleWeight() * idleCost;
+    final double lagCost = config.getLagWeight() * lagRecoveryTime;
+    final double weightedIdleCost = config.getIdleWeight() * idleCost;
+    final double cost = lagCost + weightedIdleCost;
 
     log.debug(
-        "Cost for taskCount[%d]: lagRecoveryTime[%.2fs], idleCost[%.2fs], "
+        "Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], "
         + "predictedIdle[%.3f], finalCost[%.2fs]",
         proposedTaskCount,
-        lagRecoveryTime,
-        idleCost,
+        lagCost,
+        weightedIdleCost,
         predictedIdleRatio,
         cost
     );
 
-    return cost;
+    return new CostResult(cost, lagCost, weightedIdleCost);
   }
 
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index b61c93af3c8..7794a798473 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -102,7 +102,9 @@ public class SupervisorManagerTest extends EasyMockSupport
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     metadataSupervisorManager.insert("id1", spec);
     supervisor3.start();
+    
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     replayAll();
 
     manager.start();
@@ -115,6 +117,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     resetAll();
     supervisor2.start();
+    
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.stop(true);
     replayAll();
 
@@ -164,6 +167,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
     metadataSupervisorManager.insert("id1", spec);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     replayAll();
 
     manager.start();
@@ -225,6 +229,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     );
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     replayAll();
     manager.start();
     Assert.assertFalse(manager.shouldUpdateSupervisor(spec));
@@ -249,6 +254,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     );
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     exception.expect(DruidException.class);
     replayAll();
     manager.start();
@@ -332,6 +338,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.expect(supervisor1.getStatus()).andReturn(report);
     replayAll();
 
@@ -352,6 +359,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1));
 
     replayAll();
@@ -373,6 +381,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor3.start();
+    
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
 
     replayAll();
 
@@ -414,6 +423,8 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor3.start();
+    
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.start();
     EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor 
explosion"));
     replayAll();
@@ -433,7 +444,9 @@ public class SupervisorManagerTest extends EasyMockSupport
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.capture(capturedInsert));
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.stop(true);
+    
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor2.start();
     EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor failed 
to start"));
     replayAll();
@@ -461,6 +474,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.stopAsync();
     EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
     replayAll();
@@ -479,6 +493,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class));
     replayAll();
 
@@ -498,6 +513,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor3.start();
+    
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     replayAll();
 
     manager.start();
@@ -533,6 +549,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.resetOffsets(datasourceMetadata);
     replayAll();
 
@@ -558,7 +575,9 @@ public class SupervisorManagerTest extends EasyMockSupport
     
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
     metadataSupervisorManager.insert("id1", spec);
     supervisor3.start();
+    
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     replayAll();
 
     manager.start();
@@ -574,6 +593,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     resetAll();
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.capture(capturedInsert));
     supervisor2.start();
+    
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor1.stop(true);
     replayAll();
 
@@ -589,6 +609,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.capture(capturedInsert));
     supervisor2.stop(true);
     supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     replayAll();
 
     manager.suspendOrResumeSupervisor("id1", false);
@@ -631,29 +652,29 @@ public class SupervisorManagerTest extends EasyMockSupport
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     SeekableStreamSupervisorSpec suspendedSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
-    Supervisor suspendedSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    Supervisor suspendedSupervisor = 
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
     
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
     EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
     
EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes();
     
EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes();
     
EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes();
     EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes();
-    EasyMock.replay(suspendedSpec);
+    EasyMock.replay(suspendedSpec, suspendedSupervisor);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     SeekableStreamSupervisorSpec activeSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
-    Supervisor activeSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    Supervisor activeSupervisor = 
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
     EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
     EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
     
EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes();
     
EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes();
     
EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes();
     EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes();
-    EasyMock.replay(activeSpec);
+    EasyMock.replay(activeSpec, activeSupervisor);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
-    Supervisor activeSupervisorWithConcurrentLocks = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    Supervisor activeSupervisorWithConcurrentLocks = 
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
     
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
     
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
     EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
@@ -664,11 +685,11 @@ public class SupervisorManagerTest extends EasyMockSupport
             .andReturn(null).anyTimes();
     EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
             .andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, 
true)).anyTimes();
-    EasyMock.replay(activeSpecWithConcurrentLocks);
+    EasyMock.replay(activeSpecWithConcurrentLocks, 
activeSupervisorWithConcurrentLocks);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     SeekableStreamSupervisorSpec activeAppendSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
-    Supervisor activeAppendSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    Supervisor activeAppendSupervisor = 
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
     
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
     
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
     
EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes();
@@ -678,12 +699,12 @@ public class SupervisorManagerTest extends EasyMockSupport
         Tasks.TASK_LOCK_TYPE,
         TaskLockType.APPEND.name()
     )).anyTimes();
-    EasyMock.replay(activeAppendSpec);
+    EasyMock.replay(activeAppendSpec, activeAppendSupervisor);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     // A supervisor with useConcurrentLocks set to false explicitly must not 
use an append lock
     SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
-    Supervisor supervisorWithUseConcurrentLocksFalse = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    Supervisor supervisorWithUseConcurrentLocksFalse = 
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
     
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
     
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
     EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
@@ -697,7 +718,7 @@ public class SupervisorManagerTest extends EasyMockSupport
         Tasks.TASK_LOCK_TYPE,
         TaskLockType.APPEND.name()
     )).anyTimes();
-    EasyMock.replay(specWithUseConcurrentLocksFalse);
+    EasyMock.replay(specWithUseConcurrentLocksFalse, 
supervisorWithUseConcurrentLocksFalse);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     replayAll();
@@ -737,7 +758,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
     SeekableStreamSupervisorSpec streamingSpec = 
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
-    SeekableStreamSupervisor streamSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    SeekableStreamSupervisor streamSupervisor = 
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
     streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
     EasyMock.expectLastCall().once();
     EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes();
@@ -746,7 +767,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     
EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes();
     
EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes();
     EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes();
-    EasyMock.replay(streamingSpec);
+    EasyMock.replay(streamingSpec, streamSupervisor);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().once();
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 58ffb8438e0..f0b15f8c8a0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -37,6 +37,7 @@ import 
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -62,6 +63,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.metadata.TestSupervisorSpec;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -678,6 +680,103 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
   }
 
+  @Test
+  public void 
testAutoScalerReturnsNoopWhenSupervisorIsNotSeekableStreamSupervisor()
+  {
+    // Test the branch where supervisor instanceof SeekableStreamSupervisor is 
false
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("taskCountMax", 8);
+    autoScalerConfig.put("taskCountMin", 1);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
+    
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+    
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+    
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.replay(supervisor4);
+
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+
+    // Create a non-SeekableStreamSupervisor mock
+    Supervisor nonSeekableStreamSupervisor = EasyMock.mock(Supervisor.class);
+    EasyMock.replay(nonSeekableStreamSupervisor);
+
+    // When passing a non-SeekableStreamSupervisor, should return 
NoopTaskAutoScaler
+    SupervisorTaskAutoScaler autoscaler = 
spec.createAutoscaler(nonSeekableStreamSupervisor);
+    Assert.assertTrue(
+        "Should return NoopTaskAutoScaler when supervisor is not 
SeekableStreamSupervisor",
+        autoscaler instanceof NoopTaskAutoScaler
+    );
+  }
+
+  @Test
+  public void testAutoScalerReturnsNoopWhenConfigIsNull()
+  {
+    // Test the branch where autoScalerConfig is null
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            .andReturn(null)
+            .anyTimes();
+    
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+    
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+    
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.replay(supervisor4);
+
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+
+    // When autoScalerConfig is null, should return NoopTaskAutoScaler
+    SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+    Assert.assertTrue(
+        "Should return NoopTaskAutoScaler when autoScalerConfig is null",
+        autoscaler instanceof NoopTaskAutoScaler
+    );
+  }
+
   @Test
   public void testDefaultAutoScalerConfigCreatedWithDefault()
   {
@@ -857,71 +956,6 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     autoScaler.stop();
   }
 
-  @Test
-  public void 
test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() 
throws InterruptedException
-  {
-    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
-    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
-    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
-    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
-    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
-    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
-    EasyMock.replay(spec);
-
-    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
-    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
-    EasyMock.replay(ingestionSchema);
-
-    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
-    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
-    EasyMock.replay(taskMaster);
-
-    StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
-    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
-
-    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
-        supervisor,
-        DATASOURCE,
-        mapper.convertValue(
-            getScaleOutProperties(2),
-            LagBasedAutoScalerConfig.class
-        ),
-        spec,
-        dynamicActionEmitter
-    );
-
-    supervisor.addTaskGroupToPendingCompletionTaskGroup(
-        0,
-        ImmutableMap.of("0", "0"),
-        null,
-        null,
-        Set.of("dummyTask"),
-        Collections.emptySet()
-    );
-
-    supervisor.start();
-    autoScaler.start();
-
-    supervisor.runInternal();
-    Thread.sleep(1000); // ensure a dynamic allocation notice completes
-
-    Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
-    Assert.assertTrue(
-        dynamicActionEmitter
-            
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
-            .stream()
-            .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
-            .filter(Objects::nonNull)
-            .anyMatch("There are tasks pending completion"::equals)
-    );
-
-    
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
-    autoScaler.reset();
-    autoScaler.stop();
-  }
-
   @Test
   public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() 
throws InterruptedException
   {
@@ -1593,6 +1627,175 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
   }
 
+  @Test
+  public void 
test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() 
throws InterruptedException
+  {
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        dynamicActionEmitter
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        0,
+        ImmutableMap.of("0", "0"),
+        null,
+        null,
+        Set.of("dummyTask"),
+        Collections.emptySet()
+    );
+
+    supervisor.start();
+    autoScaler.start();
+
+    supervisor.runInternal();
+    Thread.sleep(1000); // ensure a dynamic allocation notice completes
+
+    Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertTrue(
+        dynamicActionEmitter
+            
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            .stream()
+            .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+            .filter(Objects::nonNull)
+            .anyMatch("There are tasks pending completion"::equals)
+    );
+
+    
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
+  @Test
+  public void test_dynamicAllocationNotice_skips_whenSupervisorSuspended() 
throws InterruptedException
+  {
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    // Suspended → DynamicAllocationTasksNotice should return early and not 
scale
+    EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(3);
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        emitter
+    );
+
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+
+    int before = supervisor.getIoConfig().getTaskCount();
+    Thread.sleep(1000);
+    int after = supervisor.getIoConfig().getTaskCount();
+    // No scaling expected because supervisor is suspended
+    Assert.assertEquals(before, after);
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
+  @Test
+  public void 
test_changeTaskCountInIOConfig_handlesExceptionAndStillUpdatesTaskCount() 
throws InterruptedException
+  {
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    // SupervisorManager present but metadata insert fails → should be handled
+    SupervisorManager sm = EasyMock.createMock(SupervisorManager.class);
+    MetadataSupervisorManager msm = 
EasyMock.createMock(MetadataSupervisorManager.class);
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(sm)).anyTimes();
+    
EasyMock.expect(sm.getMetadataSupervisorManager()).andReturn(msm).anyTimes();
+    msm.insert(EasyMock.anyString(), EasyMock.anyObject());
+    EasyMock.expectLastCall().andThrow(new 
RuntimeException("boom")).anyTimes();
+    EasyMock.replay(taskMaster, sm, msm);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        emitter
+    );
+
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+
+    int before = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, before);
+    Thread.sleep(1000); // allow one dynamic allocation cycle
+    int after = supervisor.getIoConfig().getTaskCount();
+    // Even though metadata insert failed, taskCount should still be updated 
in ioConfig
+    Assert.assertEquals(2, after);
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
   @Test
   public void testMergeSpecConfigs()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
new file mode 100644
index 00000000000..3e4bcd92b99
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends 
EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = 
TestHelper.makeJsonMapper();
+  private static final String STREAM = "stream";
+  private static final String DATASOURCE = "testDS";
+  private static final String SUPERVISOR = "supervisor";
+  private static final int DEFAULT_TASK_COUNT = 10;
+
+  private TaskStorage taskStorage;
+  private TaskMaster taskMaster;
+  private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+  private ServiceEmitter emitter;
+  private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private SeekableStreamIndexTaskClientFactory taskClientFactory;
+  private SeekableStreamSupervisorSpec spec;
+  private SupervisorStateManagerConfig supervisorConfig;
+
+  @Before
+  public void setUp()
+  {
+    taskStorage = EasyMock.mock(TaskStorage.class);
+    taskMaster = EasyMock.mock(TaskMaster.class);
+    indexerMetadataStorageCoordinator = 
EasyMock.mock(IndexerMetadataStorageCoordinator.class);
+    emitter = new StubServiceEmitter();
+    rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
+    taskClientFactory = 
EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+    spec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    supervisorConfig = new SupervisorStateManagerConfig();
+
+    // Common taskMaster setup - used by all tests
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+  }
+
+  @Test
+  public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
+  {
+    // Given
+    setupSpecExpectations(getIOConfigWithoutAutoScaler(5));
+    
EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    EasyMock.replay(spec);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
+    supervisor.start();
+
+    int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // When
+    supervisor.maybeScaleDuringTaskRollover();
+
+    // Then
+    Assert.assertEquals(
+        "Task count should not change when taskAutoScaler is null",
+        beforeTaskCount,
+        (int) supervisor.getIoConfig().getTaskCount()
+    );
+  }
+
+  @Test
+  public void 
test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale()
+  {
+    // Given
+    setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+    EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+            .andReturn(createMockAutoScaler(-1))
+            .anyTimes();
+    EasyMock.replay(spec);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(100);
+    supervisor.start();
+    supervisor.createAutoscaler(spec);
+
+    int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // When
+    supervisor.maybeScaleDuringTaskRollover();
+
+    // Then
+    Assert.assertEquals(
+        "Task count should not change when rolloverTaskCount <= 0",
+        beforeTaskCount,
+        (int) supervisor.getIoConfig().getTaskCount()
+    );
+  }
+
+  @Test
+  public void 
test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling()
+  {
+    // Given
+    final int targetTaskCount = 5;
+
+    setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+    EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+            .andReturn(createMockAutoScaler(targetTaskCount))
+            .anyTimes();
+    EasyMock.replay(spec);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(100);
+    supervisor.start();
+    supervisor.createAutoscaler(spec);
+
+    Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+
+    // When
+    supervisor.maybeScaleDuringTaskRollover();
+
+    // Then
+    Assert.assertEquals(
+        "Task count should be updated to " + targetTaskCount + " when 
rolloverTaskCount > 0",
+        targetTaskCount,
+        (int) supervisor.getIoConfig().getTaskCount()
+    );
+  }
+
+  @Test
+  public void 
test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale()
+  {
+    // Given
+    setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+    EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+            .andReturn(createMockAutoScaler(0))
+            .anyTimes();
+    EasyMock.replay(spec);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(100);
+    supervisor.start();
+    supervisor.createAutoscaler(spec);
+
+    int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // When
+    supervisor.maybeScaleDuringTaskRollover();
+
+    // Then
+    Assert.assertEquals(
+        "Task count should not change when rolloverTaskCount is 0",
+        beforeTaskCount,
+        (int) supervisor.getIoConfig().getTaskCount()
+    );
+  }
+
+  // Helper methods for test setup
+
+  /**
+   * Sets up common spec expectations. Call EasyMock.replay(spec) after this 
and any additional expectations.
+   */
+  private void setupSpecExpectations(SeekableStreamSupervisorIOConfig ioConfig)
+  {
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+  }
+
+  /**
+   * Creates a mock autoscaler that returns the specified rollover count.
+   */
+  private static SupervisorTaskAutoScaler createMockAutoScaler(int 
rolloverCount)
+  {
+    return new SupervisorTaskAutoScaler()
+    {
+      @Override
+      public void start()
+      {
+      }
+
+      @Override
+      public void stop()
+      {
+      }
+
+      @Override
+      public void reset()
+      {
+      }
+
+      @Override
+      public int computeTaskCountForRollover()
+      {
+        return rolloverCount;
+      }
+    };
+  }
+
+  // Helper methods for config creation
+
+  private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig()
+  {
+    return CostBasedAutoScalerConfig.builder()
+                                    .taskCountMax(100)
+                                    .taskCountMin(1)
+                                    .enableTaskAutoScaler(true)
+                                    .lagWeight(0.3)
+                                    .idleWeight(0.7)
+                                    .scaleActionPeriodMillis(100)
+                                    .build();
+  }
+
+  private SeekableStreamSupervisorIOConfig getIOConfigWithCostBasedAutoScaler()
+  {
+    return createIOConfig(DEFAULT_TASK_COUNT, getCostBasedAutoScalerConfig());
+  }
+
+  private SeekableStreamSupervisorIOConfig getIOConfigWithoutAutoScaler(int 
taskCount)
+  {
+    return createIOConfig(taskCount, null);
+  }
+
+  private SeekableStreamSupervisorIOConfig createIOConfig(int taskCount, 
CostBasedAutoScalerConfig autoScalerConfig)
+  {
+    return new SeekableStreamSupervisorIOConfig(
+        STREAM,
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        taskCount,
+        new Period("PT1H"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        autoScalerConfig,
+        LagAggregator.DEFAULT,
+        null,
+        null,
+        null
+    )
+    {
+    };
+  }
+
+  private static DataSchema getDataSchema()
+  {
+    List<DimensionSchema> dimensions = new ArrayList<>();
+    dimensions.add(StringDimensionSchema.create("dim1"));
+    dimensions.add(StringDimensionSchema.create("dim2"));
+
+    return DataSchema.builder()
+                     .withDataSource(DATASOURCE)
+                     .withTimestamp(new TimestampSpec("timestamp", "iso", 
null))
+                     .withDimensions(dimensions)
+                     .withAggregators(new CountAggregatorFactory("rows"))
+                     .withGranularity(
+                         new UniformGranularitySpec(
+                             Granularities.HOUR,
+                             Granularities.NONE,
+                             ImmutableList.of()
+                         )
+                     )
+                     .build();
+  }
+
+  private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+  {
+    return new SeekableStreamSupervisorTuningConfig()
+    {
+      @Override
+      public Integer getWorkerThreads()
+      {
+        return 1;
+      }
+
+      @Override
+      public Long getChatRetries()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Duration getHttpTimeout()
+      {
+        return new Period("PT1M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getShutdownTimeout()
+      {
+        return new Period("PT1S").toStandardDuration();
+      }
+
+      @Override
+      public Duration getRepartitionTransitionDuration()
+      {
+        return new Period("PT2M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getOffsetFetchPeriod()
+      {
+        return new Period("PT5M").toStandardDuration();
+      }
+
+      @Override
+      public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+      {
+        return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, 
null, null, null, null, null, null,
+            null, null, null, null, null, null, null, null, null, null,
+            null, null, null
+        )
+        {
+          @Override
+          public SeekableStreamIndexTaskTuningConfig 
withBasePersistDirectory(File dir)
+          {
+            return null;
+          }
+
+          @Override
+          public String toString()
+          {
+            return null;
+          }
+        };
+      }
+    };
+  }
+
+  // Inner test classes
+
+  private abstract class BaseTestSeekableStreamSupervisor extends 
SeekableStreamSupervisor<String, String, ByteEntity>
+  {
+    private BaseTestSeekableStreamSupervisor()
+    {
+      super(
+          "testSupervisorId",
+          taskStorage,
+          taskMaster,
+          indexerMetadataStorageCoordinator,
+          taskClientFactory,
+          OBJECT_MAPPER,
+          spec,
+          rowIngestionMetersFactory,
+          false
+      );
+    }
+
+    @Override
+    protected String baseTaskName()
+    {
+      return "test";
+    }
+
+    @Override
+    protected void updatePartitionLagFromStream()
+    {
+    }
+
+    @Nullable
+    @Override
+    protected Map<String, Long> getPartitionRecordLag()
+    {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    protected Map<String, Long> getPartitionTimeLag()
+    {
+      return null;
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
+        int groupId,
+        Map<String, String> startPartitions,
+        Map<String, String> endPartitions,
+        String baseSequenceName,
+        DateTime minimumMessageTime,
+        DateTime maximumMessageTime,
+        Set<String> exclusiveStartSequenceNumberPartitions,
+        SeekableStreamSupervisorIOConfig ioConfig
+    )
+    {
+      return new SeekableStreamIndexTaskIOConfig<>(
+          groupId,
+          baseSequenceName,
+          new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, 
exclusiveStartSequenceNumberPartitions),
+          new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+          true,
+          minimumMessageTime,
+          maximumMessageTime,
+          ioConfig.getInputFormat(),
+          ioConfig.getTaskDuration().getStandardMinutes()
+      )
+      {
+      };
+    }
+
+    @Override
+    protected List<SeekableStreamIndexTask<String, String, ByteEntity>> 
createIndexTasks(
+        int replicas,
+        String baseSequenceName,
+        ObjectMapper sortingMapper,
+        TreeMap<Integer, Map<String, String>> sequenceOffsets,
+        SeekableStreamIndexTaskIOConfig taskIoConfig,
+        SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+        RowIngestionMetersFactory rowIngestionMetersFactory
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected int getTaskGroupIdForPartition(String partition)
+    {
+      return 0;
+    }
+
+    @Override
+    protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
+    {
+      return true;
+    }
+
+    @Override
+    protected boolean doesTaskMatchSupervisor(Task task)
+    {
+      return true;
+    }
+
+    @Override
+    protected SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetaDataForReset(
+        String stream,
+        Map<String, String> map
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, 
boolean isExclusive)
+    {
+      return new OrderedSequenceNumber<>(seq, isExclusive)
+      {
+        @Override
+        public int compareTo(OrderedSequenceNumber<String> o)
+        {
+          return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
+        }
+      };
+    }
+
+    @Override
+    protected Map<String, Long> getRecordLagPerPartition(Map<String, String> 
currentOffsets)
+    {
+      return null;
+    }
+
+    @Override
+    protected Map<String, Long> getTimeLagPerPartition(Map<String, String> 
currentOffsets)
+    {
+      return null;
+    }
+
+    @Override
+    protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier()
+    {
+      return recordSupplier;
+    }
+
+    @Override
+    protected SeekableStreamSupervisorReportPayload<String, String> 
createReportPayload(
+        int numPartitions,
+        boolean includeOffsets
+    )
+    {
+      return new SeekableStreamSupervisorReportPayload<>(SUPERVISOR, 
DATASOURCE, STREAM, 1, 1, 1L,
+          null, null, null, null, null, null,
+          false, true, null, null, null
+      )
+      {
+      };
+    }
+
+    @Override
+    protected String getNotSetMarker()
+    {
+      return "NOT_SET";
+    }
+
+    @Override
+    protected String getEndOfPartitionMarker()
+    {
+      return "EOF";
+    }
+
+    @Override
+    protected boolean isEndOfShard(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected boolean isShardExpirationMarker(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
+    {
+      return false;
+    }
+  }
+
+  private class TestSeekableStreamSupervisor extends 
BaseTestSeekableStreamSupervisor
+  {
+    private final int partitionNumbers;
+
+    public TestSeekableStreamSupervisor(int partitionNumbers)
+    {
+      this.partitionNumbers = partitionNumbers;
+    }
+
+    @Override
+    protected void scheduleReporting(ScheduledExecutorService reportingExec)
+    {
+    }
+
+    @Override
+    public LagStats computeLagStats()
+    {
+      return new LagStats(0, 0, 0);
+    }
+
+    @Override
+    public int getPartitionCount()
+    {
+      return partitionNumbers;
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 5df9edd184d..71e798e7e2f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2751,6 +2751,28 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertEquals(12, config.getMaxAllowedStops());
   }
 
+  @Test
+  public void testCreateAutoscalerStoresAndReturnsAutoscaler()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    // Test that createAutoscaler returns null when spec returns null
+    SeekableStreamSupervisorSpec mockSpec = 
EasyMock.createMock(SeekableStreamSupervisorSpec.class);
+    
EasyMock.expect(mockSpec.createAutoscaler(supervisor)).andReturn(null).once();
+    EasyMock.replay(mockSpec);
+
+    Assert.assertNull(supervisor.createAutoscaler(mockSpec));
+    EasyMock.verify(mockSpec);
+
+    verifyAll();
+  }
+
   private static DataSchema getDataSchema()
   {
     List<DimensionSchema> dimensions = new ArrayList<>();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 44bbef9c6a1..caf5453f521 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
-import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@@ -35,6 +34,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
+import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
+import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
 import static org.mockito.Mockito.when;
 
 public class CostBasedAutoScalerTest
@@ -69,12 +71,23 @@ public class CostBasedAutoScalerTest
   {
     // For 100 partitions at 25 tasks (4 partitions/task), valid counts 
include 25 and 34
     int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 
25);
+    Assert.assertTrue("Should contain the current task count", 
contains(validTaskCounts, 25));
+    Assert.assertTrue("Should contain the next scale-up option", 
contains(validTaskCounts, 34));
 
-    Assert.assertTrue("Should contain current task count", 
contains(validTaskCounts, 25));
-    Assert.assertTrue("Should contain next scale-up option", 
contains(validTaskCounts, 34));
+    // Edge cases
+    Assert.assertEquals("Zero partitions return empty array", 0, 
CostBasedAutoScaler.computeValidTaskCounts(0, 10).length);
+    Assert.assertEquals("Negative partitions return empty array", 0, 
CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length);
 
-    // Edge case: zero partitions returns empty array
-    Assert.assertEquals(0, CostBasedAutoScaler.computeValidTaskCounts(0, 
10).length);
+    // Single partition
+    int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1);
+    Assert.assertTrue("Single partition should have at least one valid count", 
singlePartition.length > 0);
+    Assert.assertTrue("Single partition should contain 1", 
contains(singlePartition, 1));
+
+    // Current exceeds partitions - should still yield valid, deduplicated 
options
+    int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5);
+    Assert.assertEquals(2, exceedsPartitions.length);
+    Assert.assertTrue(contains(exceedsPartitions, 1));
+    Assert.assertTrue(contains(exceedsPartitions, 2));
   }
 
   @Test
@@ -82,42 +95,24 @@ public class CostBasedAutoScalerTest
   {
     Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(null));
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(0.0, 10, 0, 0.0)));
-  }
-
-  @Test
-  public void testComputeOptimalTaskCountIdleInIdealRange()
-  {
-    // When idle is in ideal range [0.2, 0.6], no scaling should occur
-    Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(5000.0, 25, 100, 0.4)));
+    Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
+    Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
   }
 
   @Test
   public void testComputeOptimalTaskCountScaling()
   {
     // High idle (underutilized) - should scale down
-    // With high idle (0.8), the algorithm evaluates lower task counts and 
finds they have lower idle cost
     int scaleDownResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
     Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult < 
25);
-  }
 
-  @Test
-  public void 
testComputeOptimalTaskCountLowIdleDoesNotScaleUpWithBalancedWeights()
-  {
-    // With corrected idle ratio model and marginal lag model, low idle does 
not
-    // automatically trigger scale-up. The algorithm is conservative because:
-    // 1. Scale-up increases idle cost (more tasks = more idle per task with 
fixed load)
-    // 2. Marginal lag model means only ADDITIONAL tasks work on backlog
-    //
-    // This is intentional: the idle-heavy weights (0.4 idle) make the 
algorithm
-    // favor stability over aggressive scaling
-    int result = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 
100, 0.1));
-
-    // Algorithm evaluates costs and may find current count optimal
-    // or may scale down if idle cost reduction outweighs lag increase
-    Assert.assertTrue(
-        "With low idle and balanced weights, algorithm should not scale up 
aggressively",
-        result == -1 || result <= 25
-    );
+    // Very high idle with high task count - should scale down
+    int highIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
+    Assert.assertTrue("Scale down scenario should return optimal <= current", 
highIdleResult <= 50);
+
+    // With low idle and balanced weights, algorithm should not scale up 
aggressively
+    int lowIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
+    Assert.assertTrue("With low idle and balanced weights, should not scale up 
aggressively", lowIdleResult <= 25);
   }
 
   @Test
@@ -142,35 +137,219 @@ public class CostBasedAutoScalerTest
   }
 
   @Test
-  public void testExtractProcessingRateMovingAverage()
+  public void testExtractPollIdleRatioInvalidTypes()
+  {
+    // Non-map task metric
+    Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
+    nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
+    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
+
+    // Empty autoscaler metrics
+    Map<String, Map<String, Object>> emptyAutoscaler = new HashMap<>();
+    Map<String, Object> taskStats1 = new HashMap<>();
+    taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new 
HashMap<>());
+    emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
+    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
+
+    // Non-map autoscaler metrics
+    Map<String, Map<String, Object>> nonMapAutoscaler = new HashMap<>();
+    Map<String, Object> taskStats2 = new HashMap<>();
+    taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, 
"not-a-map");
+    nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
+    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
+
+    // Non-number poll idle ratio
+    Map<String, Map<String, Object>> nonNumberRatio = new HashMap<>();
+    Map<String, Object> taskStats3 = new HashMap<>();
+    Map<String, Object> autoscalerMetrics = new HashMap<>();
+    autoscalerMetrics.put(SeekableStreamIndexTaskRunner.POLL_IDLE_RATIO_KEY, 
"not-a-number");
+    taskStats3.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, 
autoscalerMetrics);
+    nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
+    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio), 0.0001);
+  }
+
+  @Test
+  public void testExtractMovingAverage()
   {
     // Null and empty return -1
-    Assert.assertEquals(
-        -1.,
-        CostBasedAutoScaler.extractMovingAverage(null, 
DropwizardRowIngestionMeters.FIVE_MINUTE_NAME),
-        0.0001
-    );
-    Assert.assertEquals(
-        -1.,
-        CostBasedAutoScaler.extractMovingAverage(
-            Collections.emptyMap(),
-            DropwizardRowIngestionMeters.FIVE_MINUTE_NAME
-        ),
-        0.0001
-    );
+    Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(null), 
0.0001);
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()), 0.0001);
 
     // Missing metrics return -1
     Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
     missingMetrics.put("0", Collections.singletonMap("task-0", new 
HashMap<>()));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(missingMetrics, 
DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001);
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(missingMetrics), 0.0001);
 
-    // Valid stats return average
+    // Valid stats return average (using 5-minute)
     Map<String, Map<String, Object>> validStats = new HashMap<>();
     Map<String, Object> group = new HashMap<>();
     group.put("task-0", buildTaskStatsWithMovingAverage(1000.0));
     group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
     validStats.put("0", group);
-    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(validStats, 
DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001);
+    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
+  }
+
+  @Test
+  public void testExtractMovingAverageIntervalFallback()
+  {
+    // 15-minute average is preferred
+    Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
+    fifteenMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
+    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
+
+    // 1-minute as final fallback
+    Map<String, Map<String, Object>> oneMin = new HashMap<>();
+    oneMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
+    Assert.assertEquals(500.0, 
CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
+
+    // 15-minute preferred over 5-minute when both available
+    Map<String, Map<String, Object>> allIntervals = new HashMap<>();
+    allIntervals.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0)));
+    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(allIntervals), 0.0001);
+
+    // Falls back to 5-minute when 15-minute is null
+    Map<String, Map<String, Object>> nullFifteen = new HashMap<>();
+    nullFifteen.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, FIVE_MINUTE_NAME, 750.0)));
+    Assert.assertEquals(750.0, 
CostBasedAutoScaler.extractMovingAverage(nullFifteen), 0.0001);
+
+    // Falls back to 1-minute when both 15 and 5 are null
+    Map<String, Map<String, Object>> bothNull = new HashMap<>();
+    bothNull.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithTwoNullIntervals(250.0)));
+    Assert.assertEquals(250.0, 
CostBasedAutoScaler.extractMovingAverage(bothNull), 0.0001);
+  }
+
+  @Test
+  public void testExtractMovingAverageInvalidTypes()
+  {
+    // Non-map task metric
+    Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
+    nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
+
+    // Missing buildSegments
+    Map<String, Map<String, Object>> missingBuild = new HashMap<>();
+    Map<String, Object> taskStats1 = new HashMap<>();
+    taskStats1.put("movingAverages", new HashMap<>());
+    missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
+
+    // Non-map movingAverages
+    Map<String, Map<String, Object>> nonMapMA = new HashMap<>();
+    Map<String, Object> taskStats2 = new HashMap<>();
+    taskStats2.put("movingAverages", "not-a-map");
+    nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
+
+    // Non-map buildSegments
+    Map<String, Map<String, Object>> nonMapBS = new HashMap<>();
+    Map<String, Object> taskStats3 = new HashMap<>();
+    Map<String, Object> movingAverages3 = new HashMap<>();
+    movingAverages3.put(RowIngestionMeters.BUILD_SEGMENTS, "not-a-map");
+    taskStats3.put("movingAverages", movingAverages3);
+    nonMapBS.put("0", Collections.singletonMap("task-0", taskStats3));
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapBS), 0.0001);
+
+    // Non-map interval data
+    Map<String, Map<String, Object>> nonMapInterval = new HashMap<>();
+    Map<String, Object> taskStats4 = new HashMap<>();
+    Map<String, Object> movingAverages4 = new HashMap<>();
+    Map<String, Object> buildSegments4 = new HashMap<>();
+    buildSegments4.put(FIFTEEN_MINUTE_NAME, "not-a-map");
+    movingAverages4.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments4);
+    taskStats4.put("movingAverages", movingAverages4);
+    nonMapInterval.put("0", Collections.singletonMap("task-0", taskStats4));
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapInterval), 0.0001);
+
+    // Non-number processed rate
+    Map<String, Map<String, Object>> nonNumberRate = new HashMap<>();
+    Map<String, Object> taskStats5 = new HashMap<>();
+    Map<String, Object> movingAverages5 = new HashMap<>();
+    Map<String, Object> buildSegments5 = new HashMap<>();
+    Map<String, Object> fifteenMin = new HashMap<>();
+    fifteenMin.put(RowIngestionMeters.PROCESSED, "not-a-number");
+    buildSegments5.put(FIFTEEN_MINUTE_NAME, fifteenMin);
+    movingAverages5.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments5);
+    taskStats5.put("movingAverages", movingAverages5);
+    nonNumberRate.put("0", Collections.singletonMap("task-0", taskStats5));
+    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonNumberRate), 0.0001);
+  }
+
+  @Test
+  public void testComputeTaskCountForRolloverReturnsMinusOneWhenSuspended()
+  {
+    SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+    SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
+    ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+    SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(spec.getId()).thenReturn("s-up");
+    when(spec.isSuspended()).thenReturn(true);
+    when(supervisor.getIoConfig()).thenReturn(ioConfig);
+    when(ioConfig.getStream()).thenReturn("stream");
+
+    CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+                                                             .taskCountMax(10)
+                                                             .taskCountMin(1)
+                                                             
.enableTaskAutoScaler(true)
+                                                             .lagWeight(0.5)
+                                                             .idleWeight(0.5)
+                                                             .build();
+
+    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, 
spec, emitter);
+    Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+  }
+
+  @Test
+  public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull()
+  {
+    SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+    SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
+    ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+    SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(spec.getId()).thenReturn("s-up");
+    when(spec.isSuspended()).thenReturn(false);
+    when(supervisor.computeLagStats()).thenReturn(null);
+    when(supervisor.getIoConfig()).thenReturn(ioConfig);
+    when(ioConfig.getStream()).thenReturn("stream");
+
+    CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+                                                             .taskCountMax(10)
+                                                             .taskCountMin(1)
+                                                             
.enableTaskAutoScaler(true)
+                                                             .lagWeight(0.5)
+                                                             .idleWeight(0.5)
+                                                             .build();
+
+    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, 
spec, emitter);
+    Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+  }
+
+  @Test
+  public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
+  {
+    // Tests the case where lastKnownMetrics is null (no 
computeTaskCountForScaleAction called)
+    SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+    SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
+    ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+    SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(spec.getId()).thenReturn("s-up");
+    when(spec.isSuspended()).thenReturn(false);
+    when(supervisor.getIoConfig()).thenReturn(ioConfig);
+    when(ioConfig.getStream()).thenReturn("stream");
+
+    CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+                                                             .taskCountMax(10)
+                                                             .taskCountMin(1)
+                                                             
.enableTaskAutoScaler(true)
+                                                             .lagWeight(0.5)
+                                                             .idleWeight(0.5)
+                                                             .build();
+
+    CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, 
spec, emitter);
+    // Should return -1 when lastKnownMetrics is null
+    Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
   }
 
   private CostMetrics createMetrics(
@@ -213,7 +392,68 @@ public class CostBasedAutoScalerTest
   private Map<String, Object> buildTaskStatsWithMovingAverage(double 
processedRate)
   {
     Map<String, Object> buildSegments = new HashMap<>();
-    buildSegments.put(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME, 
Map.of(RowIngestionMeters.PROCESSED, processedRate));
+    buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, 
processedRate));
+
+    Map<String, Object> movingAverages = new HashMap<>();
+    movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+    Map<String, Object> taskStats = new HashMap<>();
+    taskStats.put("movingAverages", movingAverages);
+    return taskStats;
+  }
+
+  private Map<String, Object> 
buildTaskStatsWithMovingAverageForInterval(String intervalName, double 
processedRate)
+  {
+    Map<String, Object> buildSegments = new HashMap<>();
+    buildSegments.put(intervalName, Map.of(RowIngestionMeters.PROCESSED, 
processedRate));
+
+    Map<String, Object> movingAverages = new HashMap<>();
+    movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+    Map<String, Object> taskStats = new HashMap<>();
+    taskStats.put("movingAverages", movingAverages);
+    return taskStats;
+  }
+
+  private Map<String, Object> buildTaskStatsWithMultipleMovingAverages(
+      double fifteenMinRate,
+      double fiveMinRate,
+      double oneMinRate
+  )
+  {
+    Map<String, Object> buildSegments = new HashMap<>();
+    buildSegments.put(FIFTEEN_MINUTE_NAME, 
Map.of(RowIngestionMeters.PROCESSED, fifteenMinRate));
+    buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, 
fiveMinRate));
+    buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, 
oneMinRate));
+
+    Map<String, Object> movingAverages = new HashMap<>();
+    movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+    Map<String, Object> taskStats = new HashMap<>();
+    taskStats.put("movingAverages", movingAverages);
+    return taskStats;
+  }
+
+  private Map<String, Object> buildTaskStatsWithNullInterval(String 
nullInterval, String validInterval, double processedRate)
+  {
+    Map<String, Object> buildSegments = new HashMap<>();
+    buildSegments.put(nullInterval, null);
+    buildSegments.put(validInterval, Map.of(RowIngestionMeters.PROCESSED, 
processedRate));
+
+    Map<String, Object> movingAverages = new HashMap<>();
+    movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+    Map<String, Object> taskStats = new HashMap<>();
+    taskStats.put("movingAverages", movingAverages);
+    return taskStats;
+  }
+
+  private Map<String, Object> buildTaskStatsWithTwoNullIntervals(double 
oneMinRate)
+  {
+    Map<String, Object> buildSegments = new HashMap<>();
+    buildSegments.put(FIFTEEN_MINUTE_NAME, null);
+    buildSegments.put(FIVE_MINUTE_NAME, null);
+    buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, 
oneMinRate));
 
     Map<String, Object> movingAverages = new HashMap<>();
     movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 0c5602052d4..90b50477c92 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -47,11 +47,11 @@ public class WeightedCostFunctionTest
   {
     CostMetrics validMetrics = createMetrics(100000.0, 10, 100, 0.3);
 
-    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(null, 10, config), 0.0);
-    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, 10, null), 0.0);
-    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, 0, config), 0.0);
-    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, -5, config), 0.0);
-    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config), 0.0);
+    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(null, 10, config).totalCost(), 0.0);
+    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0);
+    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0);
+    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0);
+    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, 
config).totalCost(), 0.0);
   }
 
   @Test
@@ -68,8 +68,8 @@ public class WeightedCostFunctionTest
 
     CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3);
 
-    double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig);
-    double costScaleDown = costFunction.computeCost(metrics, 5, lagOnlyConfig);
+    double costCurrent = costFunction.computeCost(metrics, 10, 
lagOnlyConfig).totalCost();
+    double costScaleDown = costFunction.computeCost(metrics, 5, 
lagOnlyConfig).totalCost();
 
     // Scale down uses absolute model: lag / (5 * rate) = higher recovery time
     // Current uses absolute model: lag / (10 * rate) = lower recovery time
@@ -97,15 +97,15 @@ public class WeightedCostFunctionTest
     CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
 
     // Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
-    double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig);
+    double costCurrent = costFunction.computeCost(metrics, 10, 
lagOnlyConfig).totalCost();
     Assert.assertEquals("Cost at current tasks", 1000., costCurrent, 0.1);
 
     // Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
-    double costUp5 = costFunction.computeCost(metrics, 15, lagOnlyConfig);
+    double costUp5 = costFunction.computeCost(metrics, 15, 
lagOnlyConfig).totalCost();
     Assert.assertEquals("Cost when scaling up by 5", 666.7, costUp5, 0.1);
 
     // Scale up by 10 (to 20): marginal model = 10M / (20 * 1000) = 500s
-    double costUp10 = costFunction.computeCost(metrics, 20, lagOnlyConfig);
+    double costUp10 = costFunction.computeCost(metrics, 20, 
lagOnlyConfig).totalCost();
     Assert.assertEquals("Cost when scaling up by 10", 500.0, costUp10, 0.01);
 
     // Adding more tasks reduces lag recovery time
@@ -120,8 +120,8 @@ public class WeightedCostFunctionTest
     // This is intentional behavior: the algorithm is conservative about 
scale-up.
     CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
 
-    double costCurrent = costFunction.computeCost(metrics, 10, config);
-    double costScaleUp = costFunction.computeCost(metrics, 20, config);
+    double costCurrent = costFunction.computeCost(metrics, 10, 
config).totalCost();
+    double costScaleUp = costFunction.computeCost(metrics, 20, 
config).totalCost();
 
     // With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from
     // scaling up dominates the lag recovery benefit
@@ -154,8 +154,8 @@ public class WeightedCostFunctionTest
 
     CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.1);
 
-    double costLag = costFunction.computeCost(metrics, 10, lagOnly);
-    double costIdle = costFunction.computeCost(metrics, 10, idleOnly);
+    double costLag = costFunction.computeCost(metrics, 10, 
lagOnly).totalCost();
+    double costIdle = costFunction.computeCost(metrics, 10, 
idleOnly).totalCost();
 
     Assert.assertNotEquals("Different weights should produce different costs", 
costLag, costIdle, 0.0001);
     Assert.assertTrue("Lag-only cost should be positive", costLag > 0.0);
@@ -170,9 +170,9 @@ public class WeightedCostFunctionTest
     int currentTaskCount = 10;
     CostMetrics metricsNoRate = createMetricsWithRate(50000.0, 
currentTaskCount, 100, 0.3, 0.0);
 
-    double costAtCurrent = costFunction.computeCost(metricsNoRate, 
currentTaskCount, config);
-    double costScaleUp = costFunction.computeCost(metricsNoRate, 
currentTaskCount + 5, config);
-    double costScaleDown = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, config);
+    double costAtCurrent = costFunction.computeCost(metricsNoRate, 
currentTaskCount, config).totalCost();
+    double costScaleUp = costFunction.computeCost(metricsNoRate, 
currentTaskCount + 5, config).totalCost();
+    double costScaleDown = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, config).totalCost();
 
     Assert.assertTrue(
         "Cost at current should be less than cost for scale up",
@@ -201,8 +201,8 @@ public class WeightedCostFunctionTest
         .defaultProcessingRate(1000.0)
         .build();
 
-    double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount 
+ 5, lagOnlyConfig);
-    double costDown5 = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, lagOnlyConfig);
+    double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount 
+ 5, lagOnlyConfig).totalCost();
+    double costDown5 = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, lagOnlyConfig).totalCost();
 
     Assert.assertEquals(
         "Lag cost for +5 and -5 deviation should be equal",
@@ -229,10 +229,10 @@ public class WeightedCostFunctionTest
     // Current: 10 tasks with 40% idle (60% busy)
     CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
 
-    double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig);
-    double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig);
-    double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig);
-    double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig);
+    double costAt5 = costFunction.computeCost(metrics, 5, 
idleOnlyConfig).totalCost();
+    double costAt10 = costFunction.computeCost(metrics, 10, 
idleOnlyConfig).totalCost();
+    double costAt15 = costFunction.computeCost(metrics, 15, 
idleOnlyConfig).totalCost();
+    double costAt20 = costFunction.computeCost(metrics, 20, 
idleOnlyConfig).totalCost();
 
     // Monotonically increasing idle cost as tasks increase
     Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10);
@@ -256,7 +256,7 @@ public class WeightedCostFunctionTest
     // busyFraction = 0.6, taskRatio = 0.2
     // predictedIdle = 1 - 0.6/0.2 = 1 - 3 = -2 → clamped to 0
     CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
-    double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig);
+    double costAt2 = costFunction.computeCost(metrics, 2, 
idleOnlyConfig).totalCost();
 
     // idlenessCost = taskCount * taskDuration * 0.0 (clamped) = 0
     Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped 
to 0", 0.0, costAt2, 0.0001);
@@ -266,7 +266,7 @@ public class WeightedCostFunctionTest
     // busyFraction = 0.9, taskRatio = 10
     // predictedIdle = 1 - 0.9/10 = 1 - 0.09 = 0.91 (within bounds)
     CostMetrics lowIdle = createMetrics(0.0, 10, 100, 0.1);
-    double costAt100 = costFunction.computeCost(lowIdle, 100, idleOnlyConfig);
+    double costAt100 = costFunction.computeCost(lowIdle, 100, 
idleOnlyConfig).totalCost();
     // idlenessCost = 100 * 3600 * 0.91 = 327600
     Assert.assertTrue("Cost should be finite and positive", 
Double.isFinite(costAt100) && costAt100 > 0);
   }
@@ -286,8 +286,8 @@ public class WeightedCostFunctionTest
     // Negative idle ratio indicates missing data → should default to 0.5
     CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0);
 
-    double cost10 = costFunction.computeCost(missingIdleData, 10, 
idleOnlyConfig);
-    double cost20 = costFunction.computeCost(missingIdleData, 20, 
idleOnlyConfig);
+    double cost10 = costFunction.computeCost(missingIdleData, 10, 
idleOnlyConfig).totalCost();
+    double cost20 = costFunction.computeCost(missingIdleData, 20, 
idleOnlyConfig).totalCost();
 
     // With missing data, predicted idle = 0.5 for all task counts
     // idlenessCost at 10 = 10 * 3600 * 0.5 = 18000
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 30a0d7a723e..ce6c87e5e08 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 
 import javax.annotation.Nullable;
@@ -86,6 +87,11 @@ public interface Supervisor
     return null; // default implementation for interface compatability; 
returning null since true or false is misleading
   }
 
+  default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
+  {
+    return spec.createAutoscaler(this);
+  }
+
   /**
    * Resets any stored metadata by the supervisor.
    * @param dataSourceMetadata optional dataSource metadata.
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
index c921e2740b8..17cd347231a 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -24,4 +24,15 @@ public interface SupervisorTaskAutoScaler
   void start();
   void stop();
   void reset();
+
+  /**
+   * Computes the optimal task count during task rollover, allowing a 
non-disruptive scale-down.
+   * Must be called by the supervisor when tasks are ending their duration.
+   *
+   * @return optimal task count for scale-down, or -1 if no change needed
+   */
+  default int computeTaskCountForRollover()
+  {
+    return -1;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to