This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b757c39ae Do not kill a task if offsets are inconsistent but publish 
from another group is pending (#19091)
24b757c39ae is described below

commit 24b757c39ae5425afb59f925f3cbfd3141701fa0
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Mar 10 03:03:51 2026 +0530

    Do not kill a task if offsets are inconsistent but publish from another 
group is pending (#19091)
    
    Follow up to #19034
    
    Changes
    ---------
    - Add method 
`SeekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions()`
    - Use this method to check if a task needs to wait before publishing its 
own offests
    - Update `SegmentTransactionalAppendAction` and 
`SegmentTransactionalInsertAction` to return
    a retryable error response only if there is a pending publish that 
conflicts with the current action
    - Fix behaviour of scale down on task rollover in `SeekableStreamSupervisor`
    - Fix bug in `SeekableStreamSupervisorIOConfig`
    - Fix bug in `CostBasedAutoScaler` to avoid spurious scale downs
    - Validate metrics in `CostBasedAutoScaler` before proceeding with scaling 
action
    - Add new tests in `CostBasedAutoScalerIntegrationTest`
---
 docs/ingestion/supervisor.md                       |   6 +-
 .../CostBasedAutoScalerIntegrationTest.java        | 264 +++++++++++----------
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   3 +-
 .../actions/SegmentTransactionalAppendAction.java  |   7 +-
 .../actions/SegmentTransactionalInsertAction.java  |   7 +-
 .../indexing/common/actions/TaskActionToolbox.java |  26 ++
 .../overlord/supervisor/SupervisorManager.java     |  51 ++++
 .../supervisor/SeekableStreamSupervisor.java       | 179 ++++++++++----
 .../SeekableStreamSupervisorIOConfig.java          |   9 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java | 151 ++++++++----
 .../supervisor/autoscaler/CostMetrics.java         |   2 +-
 .../supervisor/autoscaler/CostResult.java          |   6 -
 .../autoscaler/WeightedCostFunction.java           |  21 +-
 .../SegmentTransactionalInsertActionTest.java      |   2 +-
 ...treamSupervisorScaleDuringTaskRolloverTest.java |  14 +-
 .../SeekableStreamSupervisorStateTest.java         |   2 +-
 .../autoscaler/CostBasedAutoScalerMockTest.java    |   5 +
 .../autoscaler/CostBasedAutoScalerTest.java        |   1 +
 .../autoscaler/WeightedCostFunctionTest.java       |  46 ++--
 .../TransactionalSegmentPublisher.java             |  10 +-
 .../TransactionalSegmentPublisherTest.java         |   8 +-
 21 files changed, 539 insertions(+), 281 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 360e6ad86d2..8595912e846 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -77,8 +77,8 @@ The following table outlines the configuration properties for 
`autoScalerConfig`
 |--------|-----------|--------|-------|
 |`enableTaskAutoScaler`|Enables the autoscaler. If not specified, Druid 
disables the autoscaler even when `autoScalerConfig` is not null.|No|`false`|
 |`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or 
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka 
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to 
the number of Kafka partitions or Kinesis shards and ignores 
`taskCountMax`.|Yes||
-|`taskCountMin`|The minimum number of ingestion tasks. When you enable the 
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts 
with the `taskCountMin` number of tasks to launch.|Yes||
-|`taskCountStart`|Optional config to specify the number of ingestion tasks to 
start with. When you enable the autoscaler, Druid ignores the value of 
`taskCount` in `ioConfig` and, if specified, starts with the `taskCountStart` 
number of tasks. Otherwise, defaults to `taskCountMin`.|No|`taskCountMin`|
+|`taskCountMin`|The minimum number of ingestion tasks. When you enable the 
autoscaler, Druid computes the initial number of tasks to launch by checking 
the configs in the following order: `taskCountStart`, then `taskCount` (in 
`ioConfig`), then `taskCountMin`.|Yes||
+|`taskCountStart`|Optional config to specify the number of ingestion tasks to 
start with. When you enable the autoscaler, Druid computes the initial number 
of tasks to launch by checking the configs in the following order: 
`taskCountStart`, then `taskCount` (in `ioConfig`), then 
`taskCountMin`.|No|`taskCount` or `taskCountMin`|
 |`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two 
scale actions.| No|600000|
 |`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 |`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a 
valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in 
steady state to be proportional to the number of tasks currently running.|No||
@@ -209,7 +209,7 @@ The following table outlines the configuration properties 
related to the `costBa
 |`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
 |`idleWeight`|The weight of extracted poll idle value in cost function. | No | 
0.75 |
 |`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when 
selecting task counts.|No| `false` |
-|`highLagThreshold`|Per-partition lag threshold that triggers burst scale-up 
when set to a value greater than `0`. Set to a negative value to disable burst 
scale-up.|No|-1|
+|`highLagThreshold`|Average partition lag threshold that triggers burst 
scale-up when set to a value greater than `0`. Set to a negative value to 
disable burst scale-up.|No|-1|
 |`minScaleDownDelay`|Minimum duration between successful scale actions, 
specified as an ISO-8601 duration string.|No|`PT30M`|
 |`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is 
limited to periods during task rollovers only.|No|`false`|
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index 52d2bb54397..6bdb13236bf 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -19,38 +19,34 @@
 
 package org.apache.druid.testing.embedded.indexing.autoscaler;
 
-import org.apache.druid.data.input.impl.TimestampSpec;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.kafka.simulate.KafkaResource;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.testing.embedded.EmbeddedClusterApis;
-import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
-import org.apache.druid.testing.embedded.EmbeddedHistorical;
-import org.apache.druid.testing.embedded.EmbeddedIndexer;
-import org.apache.druid.testing.embedded.EmbeddedOverlord;
-import org.apache.druid.testing.embedded.EmbeddedRouter;
-import org.apache.druid.testing.embedded.indexing.MoreResources;
-import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.apache.druid.testing.embedded.indexing.StreamIndexTestBase;
 import org.hamcrest.Matchers;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
+import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.OPTIMAL_TASK_COUNT_METRIC;
 
@@ -60,64 +56,38 @@ import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.Cos
  * Tests the autoscaler's ability to compute optimal task counts based on 
partition count and cost metrics (lag and idle time).
  */
 @SuppressWarnings("resource")
-public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
+public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase
 {
-  private static final String TOPIC = 
EmbeddedClusterApis.createTestDatasourceName();
-  private static final String EVENT_TEMPLATE = 
"{\"timestamp\":\"%s\",\"dimension\":\"value%d\",\"metric\":%d}";
   private static final int PARTITION_COUNT = 50;
 
-  private final EmbeddedBroker broker = new EmbeddedBroker();
-  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
-  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
-  private final EmbeddedHistorical historical = new EmbeddedHistorical();
-  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
-  private KafkaResource kafkaServer;
+  private String topic;
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
 
   @Override
   public EmbeddedDruidCluster createCluster()
   {
-    final EmbeddedDruidCluster cluster = 
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
-
-    kafkaServer = new KafkaResource()
-    {
-      @Override
-      public void start()
-      {
-        super.start();
-        createTopicWithPartitions(TOPIC, PARTITION_COUNT);
-        produceRecordsToKafka(500, 1);
-      }
+    return super
+        .createCluster()
+        .useDefaultTimeoutForLatchableEmitter(600);
+  }
 
-      @Override
-      public void stop()
-      {
-        deleteTopic(TOPIC);
-        super.stop();
-      }
-    };
-
-    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
-           .addProperty("druid.worker.capacity", "100");
-
-    cluster.useLatchableEmitter()
-           .useDefaultTimeoutForLatchableEmitter(60)
-           .addResource(kafkaServer)
-           .addServer(coordinator)
-           .addServer(overlord)
-           .addServer(indexer)
-           .addServer(broker)
-           .addServer(historical)
-           .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.5s")
-           .addServer(new EmbeddedRouter());
-
-    return cluster;
+  @BeforeEach
+  public void createTopic()
+  {
+    topic = dataSource;
+    kafkaServer.createTopicWithPartitions(topic, PARTITION_COUNT);
   }
 
   @Test
   @Timeout(45)
   public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
   {
-    final String superId = dataSource + "_super";
     final int initialTaskCount = 10;
 
     final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
@@ -135,10 +105,10 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         .minScaleDownDelay(Duration.ZERO)
         .build();
 
-    final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
+    final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(autoScalerConfig, initialTaskCount);
 
     // Submit the supervisor
-    Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+    Assertions.assertEquals(spec.getId(), 
cluster.callApi().postSupervisor(spec));
 
     // Wait for the supervisor to be healthy and running
     overlord.latchableEmitter()
@@ -159,8 +129,6 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
   @Test
   public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
   {
-    final String superId = dataSource + "_super_scaleup";
-
     // Start with a low task count (1 task for 50 partitions) and produce a 
large amount of data
     // to create lag pressure and low idle ratio, which should trigger a 
scale-up decision.
     // With the ideal idle range [0.2, 0.6], a single overloaded task will 
have idle < 0.2,
@@ -169,7 +137,11 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
 
     // Produce additional records to create a backlog / lag
     // This ensures tasks are busy processing (low idle ratio)
-    Executors.newSingleThreadExecutor().submit(() -> 
produceRecordsToKafka(500_000, 20));
+    Executors.newSingleThreadExecutor().submit(() -> {
+      for (int i = 0; i < 500; ++i) {
+        publish1kRecords(topic, true);
+      }
+    });
 
     // These values were carefully handpicked to allow that test to pass in a 
stable manner.
     final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
@@ -185,13 +157,12 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         .build();
 
     final KafkaSupervisorSpec kafkaSupervisorSpec = 
createKafkaSupervisorWithAutoScaler(
-        superId,
         autoScalerConfig,
         lowInitialTaskCount
     );
 
     // Submit the supervisor
-    Assertions.assertEquals(superId, 
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
+    Assertions.assertEquals(kafkaSupervisorSpec.getId(), 
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
 
     // Wait for the supervisor to be healthy and running
     overlord.latchableEmitter()
@@ -209,10 +180,93 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  public void test_autoScaler_scalesUpAndDown_withSlowPublish()
+  {
+    final String topic = EmbeddedClusterApis.createTestDatasourceName();
+    kafkaServer.createTopicWithPartitions(topic, 4);
+
+    // A small value of maxRowsPerSegment ensures that there are a large number
+    // of segments to publish, thus slowing down publish actions
+    final int maxRowsPerSegment = 100;
+
+    final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
+        .builder()
+        .taskCountMin(1)
+        .taskCountMax(4)
+        .lagWeight(1.0)
+        .idleWeight(1.0)
+        .enableTaskAutoScaler(true)
+        .minTriggerScaleActionFrequencyMillis(10L)
+        .scaleActionPeriodMillis(10L)
+        .minScaleDownDelay(Duration.standardSeconds(1))
+        .build();
+
+    // taskDuration of 10s gives enough time to auto-scaler to fetch task 
metrics
+    final SupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
+        .withTuningConfig(t -> t.withMaxRowsPerSegment(maxRowsPerSegment))
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withTaskCount(1)
+                .withTaskDuration(Period.seconds(10))
+                .withSupervisorRunPeriod(Period.millis(10))
+                .withAutoScalerConfig(autoScalerConfig)
+        )
+        .build(dataSource, topic);
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Ingest a large number of records to trigger a scale-up
+    // 10k records = 100 segments to publish * 100 rows per segment
+    int totalRecords = 0;
+    for (int i = 0; i < 10; ++i) {
+      totalRecords += publish1kRecords(topic, false);
+    }
+
+    // Wait for tasks to scale up
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/autoScaler/updatedCount")
+                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                      .hasValueMatching(Matchers.equalTo(4L))
+    );
+    Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Let the tasks work through the lag.
+    // Do not publish any more records so that the idleness causes scale-down
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/autoScaler/updatedCount")
+                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisor.getId())
+                      .hasValueMatching(Matchers.equalTo(1L))
+    );
+    Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+
+    cluster.callApi().postSupervisor(supervisor.createSuspendedSpec());
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+    Assertions.assertEquals("10000", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
+
+    final List<TaskStatusPlus> tasks = cluster.callApi().getTasks(dataSource, 
"complete");
+    Assertions.assertFalse(tasks.isEmpty());
+
+    // Ensure that there are no task failures due to auto-scaling
+    final String expectedErrorOnShutdown = "Killing task for graceful 
shutdown";
+    final Map<String, String> taskIdToError = new HashMap<>();
+    for (TaskStatusPlus task : tasks) {
+      if (task.getStatusCode() == TaskState.FAILED && 
!expectedErrorOnShutdown.equals(task.getErrorMsg())) {
+        taskIdToError.put(task.getId(), task.getErrorMsg());
+      }
+    }
+    Assertions.assertTrue(
+        taskIdToError.isEmpty(),
+        StringUtils.format(
+            "[%d / %d] tasks have failed with errors: %s",
+            taskIdToError.size(), tasks.size(), taskIdToError
+        )
+    );
+  }
+
   @Test
   void test_scaleDownDuringTaskRollover()
   {
-    final String superId = dataSource + "_super";
     final int initialTaskCount = 10;
 
     final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
@@ -220,9 +274,8 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         .enableTaskAutoScaler(true)
         .taskCountMin(1)
         .taskCountMax(10)
-        .taskCountStart(initialTaskCount)
-        .scaleActionPeriodMillis(2000)
-        .minTriggerScaleActionFrequencyMillis(2000)
+        .scaleActionPeriodMillis(100)
+        .minTriggerScaleActionFrequencyMillis(100)
         // High idle weight ensures scale-down when tasks are mostly idle 
(little data to process)
         .lagWeight(0.1)
         .idleWeight(0.9)
@@ -231,10 +284,10 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         .minScaleDownDelay(Duration.ZERO)
         .build();
 
-    final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
+    final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(autoScalerConfig, initialTaskCount);
 
     // Submit the supervisor
-    Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+    Assertions.assertEquals(spec.getId(), 
cluster.callApi().postSupervisor(spec));
 
     // Wait for at least one task running for the datasource managed by the 
supervisor.
     overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/run/time")
@@ -242,19 +295,14 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
 
     // Wait for autoscaler to emit metric indicating scale-down, it should be 
just less than the current task count.
     overlord.latchableEmitter().waitForEvent(
-        event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
-                      .hasValueMatching(Matchers.lessThan((long) 
initialTaskCount)));
-
-    // Wait for tasks to complete (first rollover)
-    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/action/success/count"));
-
-    // Wait for the task running for the datasource managed by a supervisor.
-    overlord.latchableEmitter().waitForEvent(e -> 
e.hasMetricName("task/run/time")
-                                                   
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+        event -> event.hasMetricName("task/autoScaler/updatedCount")
+                      .hasDimension(DruidMetrics.SUPERVISOR_ID, spec.getId())
+                      .hasValueMatching(Matchers.lessThan((long) 
initialTaskCount))
+    );
 
     // After rollover, verify that the running task count has decreased
     // The autoscaler should have recommended fewer tasks due to high idle time
-    final int postRolloverRunningTasks = 
cluster.callApi().getTaskCount("running", dataSource);
+    final int postRolloverRunningTasks = getCurrentTaskCount(spec.getId());
 
     Assertions.assertTrue(
         postRolloverRunningTasks < initialTaskCount,
@@ -269,53 +317,29 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
     cluster.callApi().postSupervisor(spec.createSuspendedSpec());
   }
 
-  private void produceRecordsToKafka(int recordCount, int iterations)
-  {
-    int recordCountPerSlice = recordCount / iterations;
-    int counter = 0;
-    for (int i = 0; i < iterations; i++) {
-      DateTime timestamp = DateTime.now(DateTimeZone.UTC);
-      List<ProducerRecord<byte[], byte[]>> records = IntStream
-          .range(counter, counter + recordCountPerSlice)
-          .mapToObj(k -> new ProducerRecord<byte[], byte[]>(
-                        TOPIC,
-                        k % PARTITION_COUNT,
-                        null,
-                        StringUtils.format(EVENT_TEMPLATE, timestamp, k, k)
-                                   .getBytes(StandardCharsets.UTF_8)
-                    )
-          )
-          .collect(Collectors.toList());
-
-      kafkaServer.produceRecordsToTopic(records);
-      try {
-        Thread.sleep(100L);
-        counter += recordCountPerSlice;
-      }
-      catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
   private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
-      String supervisorId,
       CostBasedAutoScalerConfig autoScalerConfig,
       int taskCount
   )
   {
-    return MoreResources.Supervisor.KAFKA_JSON
-        .get()
-        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null)))
-        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(100))
+    return createKafkaSupervisor(kafkaServer)
         .withIoConfig(
             ioConfig -> ioConfig
-                .withConsumerProperties(kafkaServer.consumerProperties())
                 .withTaskCount(taskCount)
-                .withTaskDuration(Period.seconds(7))
+                .withTaskDuration(Period.seconds(1))
                 .withAutoScalerConfig(autoScalerConfig)
         )
-        .withId(supervisorId)
-        .build(dataSource, TOPIC);
+        .build(dataSource, topic);
+  }
+
+  private int getCurrentTaskCount(String supervisorId)
+  {
+    final String getSupervisorPath = 
StringUtils.format("/druid/indexer/v1/supervisor/%s", supervisorId);
+    final KafkaSupervisorSpec supervisorSpec = 
cluster.callApi().serviceClient().onLeaderOverlord(
+        mapper -> new RequestBuilder(HttpMethod.GET, getSupervisorPath),
+        new TypeReference<>(){}
+    );
+    Assertions.assertNotNull(supervisorSpec);
+    return supervisorSpec.getSpec().getIOConfig().getTaskCount();
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 78d3c6791ab..522aebad4cf 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -428,8 +428,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     replayAll();
 
     int taskCountInit = supervisor.getIoConfig().getTaskCount();
-    // when enable autoScaler the init taskCount will be equal to taskCountMin
-    Assert.assertEquals(1, taskCountInit);
+    Assert.assertEquals(2, taskCountInit);
     supervisor.getIoConfig().setTaskCount(2);
 
     supervisor.start();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 3833c4be6b0..15001fcd4a2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -219,7 +219,12 @@ public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPubli
     }
 
     IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
-    return retVal;
+
+    if (toolbox.shouldFailSegmentPublishImmediately(retVal, task, 
supervisorId, startMetadata)) {
+      return SegmentPublishResult.fail(retVal.getErrorMsg());
+    } else {
+      return retVal;
+    }
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index ab84c38cc6a..bf121812573 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -259,7 +259,12 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
     }
 
     IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
-    return retVal;
+
+    if (toolbox.shouldFailSegmentPublishImmediately(retVal, task, 
supervisorId, startMetadata)) {
+      return SegmentPublishResult.fail(retVal.getErrorMsg());
+    } else {
+      return retVal;
+    }
   }
 
   private void checkWithSegmentLock()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
index d430e3f5a9f..6294710f167 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
@@ -23,8 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.GlobalTaskLockbox;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
 import org.apache.druid.indexing.overlord.TaskStorage;
@@ -135,4 +138,27 @@ public class TaskActionToolbox
   {
     return segmentAllocationQueue != null && 
segmentAllocationQueue.isEnabled();
   }
+
+  /**
+   * Checks if the given publish action should be failed without allowing any
+   * more retries. A failed publish action should be retried only if there is
+   * another task waiting to publish offsets for an overlapping set of 
partitions.
+   */
+  public boolean shouldFailSegmentPublishImmediately(
+      SegmentPublishResult result,
+      Task task,
+      String supervisorId,
+      DataSourceMetadata startMetadata
+  )
+  {
+    if (result.isSuccess() || !result.isRetryable() || startMetadata == null) {
+      return false;
+    }
+
+    return !getSupervisorManager().isAnotherTaskGroupPublishingToPartitions(
+        supervisorId,
+        task.getId(),
+        startMetadata
+    );
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 0cd799bed54..426f3a71619 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -27,11 +27,14 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.error.NotFound;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.common.Pair;
@@ -427,6 +430,54 @@ public class SupervisorManager
     return false;
   }
 
+  /**
+   * Checks if there is a Task distinct from the given {@code taskId} or its 
replicas
+   * that is currently waiting to publish offsets for the given partitions.
+   */
+  public boolean isAnotherTaskGroupPublishingToPartitions(
+      String supervisorId,
+      String taskId,
+      DataSourceMetadata startMetadata
+  )
+  {
+    try {
+      InvalidInput.conditionalException(supervisorId != null, "'supervisorId' 
cannot be null");
+      if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+        throw InvalidInput.exception(
+              "Start metadata[%s] of type[%s] is not valid streaming metadata",
+            supervisorId, startMetadata.getClass()
+        );
+      }
+
+      Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+      if (supervisor == null || supervisor.rhs == null) {
+        throw NotFound.exception("Could not find supervisor[%s]", 
supervisorId);
+      }
+      if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
+        throw InvalidInput.exception(
+            "Supervisor[%s] of type[%s] is not a streaming supervisor",
+            supervisorId, supervisor.rhs.getType()
+        );
+      }
+
+      final Set<Object> partitionIds = Set.copyOf(
+          ((SeekableStreamDataSourceMetadata<?, ?>) startMetadata)
+              .getSeekableStreamSequenceNumbers()
+              .getPartitionSequenceNumberMap()
+              .keySet()
+      );
+      return ((SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs)
+          .isAnotherTaskGroupPublishingToPartitions(taskId, partitionIds);
+    }
+    catch (Exception e) {
+      log.error(
+          e,
+          "Failed to check if a publish is pending for supervisor[%s], 
metadata[%s]",
+          supervisorId, startMetadata
+      );
+      return false;
+    }
+  }
 
   /**
    * Stops a supervisor with a given id and then removes it from the list.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f5128dc3809..dbe79b85780 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -130,6 +130,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
@@ -157,6 +158,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
   public static final String AUTOSCALER_SKIP_REASON_DIMENSION = 
"scalingSkipReason";
   public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = 
"task/autoScaler/requiredCount";
+  public static final String AUTOSCALER_UPDATED_TASK_METRIC = 
"task/autoScaler/updatedCount";
   public static final String AUTOSCALER_SCALING_TIME_METRIC = 
"task/autoScaler/scaleActionTime";
 
   private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
@@ -503,7 +505,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                                                
.setDimension(DruidMetrics.DATASOURCE, dataSource)
                                                                
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
           for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
-            if (!list.isEmpty()) {
+            // There are expected to be pending tasks if this scaling is 
happening on task rollover
+            if (!list.isEmpty() && !isScalingTasksOnRollover.get()) {
               log.info(
                   "Skipping DynamicAllocationTasksNotice execution for 
supervisor[%s] for datasource[%s] because following tasks are pending [%s]",
                   supervisorId,
@@ -591,9 +594,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private boolean changeTaskCount(int desiredActiveTaskCount)
       throws InterruptedException, ExecutionException
   {
-    int currentActiveTaskCount;
-    Collection<TaskGroup> activeTaskGroups = 
activelyReadingTaskGroups.values();
-    currentActiveTaskCount = activeTaskGroups.size();
+    final int currentActiveTaskCount = getCurrentTaskCount();
 
     if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == 
currentActiveTaskCount) {
       return false;
@@ -609,19 +610,15 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       gracefulShutdownInternal();
       changeTaskCountInIOConfig(desiredActiveTaskCount);
       clearPartitionAssignmentsForScaling();
-      emitter.emit(ServiceMetricEvent.builder()
-                                     .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
-                                     .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
-                                     .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
-                                     .setDimensionIfNotNull(
-                                         DruidMetrics.TAGS,
-                                         
spec.getContextValue(DruidMetrics.TAGS)
-                                     )
-                                     .setMetric(
-                                         AUTOSCALER_SCALING_TIME_METRIC,
-                                         scaleActionStopwatch.millisElapsed()
-                                     ));
-      log.info("Changed taskCount to [%s] for supervisor[%s] for 
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
+
+
+      final ServiceMetricEvent.Builder metricBuilder = getMetricBuilder();
+      emitter.emit(metricBuilder.setMetric(AUTOSCALER_SCALING_TIME_METRIC, 
scaleActionStopwatch.millisElapsed()));
+      emitter.emit(metricBuilder.setMetric(AUTOSCALER_UPDATED_TASK_METRIC, 
(long) desiredActiveTaskCount));
+      log.info(
+          "Updated taskCount from [%s] to [%s] for supervisor[%s] for 
dataSource[%s].",
+          currentActiveTaskCount, desiredActiveTaskCount, supervisorId, 
dataSource
+      );
       return true;
     }
   }
@@ -643,6 +640,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * @return Current task count for this supervisor.
+   */
+  private int getCurrentTaskCount()
+  {
+    return getIoConfig().getTaskCount();
+  }
+
   /**
    * Clears previous partition assignments in preparation for an upcoming 
scaling event.
    * <p>
@@ -968,6 +973,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Wired by {@link SupervisorManager} after supervisor creation.
    */
   private volatile SupervisorTaskAutoScaler taskAutoScaler;
+  private final AtomicBoolean isScalingTasksOnRollover = new 
AtomicBoolean(false);
 
   // snapshots latest sequences from the stream to be verified in the next run 
cycle of inactive stream check
   private final Map<PartitionIdType, SequenceOffsetType> 
previousSequencesFromStream = new HashMap<>();
@@ -2615,9 +2621,17 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
                   return sequence.compareTo(latestOffset) == 0;
                 }
-            ) && earliestConsistentSequenceId.compareAndSet(-1, 
sequenceCheckpoint.getKey())) || (
-                !pendingCompletionTaskGroups.getOrDefault(groupId, new 
CopyOnWriteArrayList<>()).isEmpty()
-                && earliestConsistentSequenceId.compareAndSet(-1, 
taskCheckpoints.firstKey()))) {
+            ) && earliestConsistentSequenceId.compareAndSet(-1, 
sequenceCheckpoint.getKey()))
+            // OR if there is an older publishing task for the same groupId, 
use the first sequenceId of this task
+            // since the offsets persisted in metadata store might still be 
behind the checkpoints of this task
+            || (!pendingCompletionTaskGroups.getOrDefault(groupId, new 
CopyOnWriteArrayList<>()).isEmpty()
+                && earliestConsistentSequenceId.compareAndSet(-1, 
taskCheckpoints.firstKey()))
+
+            // OR if there is an older publishing task for another groupId, 
use the first sequenceId of this task
+            // since the offsets persisted in metadata store might still be 
behind the checkpoints of this task
+            || (isAnotherTaskGroupPublishingToPartitions(taskId, 
Set.copyOf(taskGroup.startingSequences.keySet()))
+                && earliestConsistentSequenceId.compareAndSet(-1, 
taskCheckpoints.firstKey()))
+        ) {
           final SortedMap<Integer, Map<PartitionIdType, SequenceOffsetType>> 
latestCheckpoints = new TreeMap<>(
               taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
           );
@@ -2626,8 +2640,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           taskGroup.checkpointSequences.clear();
           taskGroup.checkpointSequences.putAll(latestCheckpoints);
         } else {
-          log.debug(
-              "Adding task[%s] to kill list, checkpoints[%s], latestoffsets 
from DB [%s]",
+          log.warn(
+              "Adding task[%s] to kill list as its checkpoints[%s] are not 
consistent with checkpoints[%s] in metadata store",
               taskId,
               taskCheckpoints,
               latestOffsetsFromDb
@@ -2641,8 +2655,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                 
.equals(taskGroup.checkpointSequences.firstEntry().getValue()))
             || 
taskCheckpoints.tailMap(taskGroup.checkpointSequences.firstKey()).size()
                != taskGroup.checkpointSequences.size()) {
-          log.debug(
-              "Adding task[%s] to kill list, checkpoints[%s], taskgroup 
checkpoints [%s]",
+          log.warn(
+              "Adding task[%s] to kill list as its checkpoints[%s] are not 
consistent with taskGroup checkpoints[%s]",
               taskId,
               taskCheckpoints,
               taskGroup.checkpointSequences
@@ -2669,18 +2683,57 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         sequenceCheckpoint -> {
           killTask(
               sequenceCheckpoint.lhs,
-              "Killing task[%s], as its checkpoints[%s] are not consistent 
with group checkpoints[%s]"
-              + " or latest persisted sequences in metadata store[%s].",
-              sequenceCheckpoint.lhs,
-              sequenceCheckpoint.rhs,
-              taskGroup.checkpointSequences,
-              latestOffsetsFromDb
+              "Killing task as its checkpoints are not consistent with group 
checkpoints"
+              + " or latest persisted sequences in metadata store."
           );
           taskGroup.removeTask(sequenceCheckpoint.lhs);
         }
     );
   }
 
+  /**
+   * Checks if there is a Task distinct from the given {@code taskId} or its 
replicas
+   * publishing to any of the given partitions. If this method returns true, it
+   * indicates that the current task would need to wait for the other tasks to
+   * finish publishing before it can publish its own offsets.
+   */
+  public boolean isAnotherTaskGroupPublishingToPartitions(String taskId, 
Set<Object> partitions)
+  {
+    // Identify all the partitions that are being published by other taskGroups
+    final Map<PartitionIdType, Set<TaskGroup>> partitionIdToPublishingGroups = 
new HashMap<>();
+    
pendingCompletionTaskGroups.values().stream().flatMap(Collection::stream).forEach(group
 -> {
+      // Do not consider this taskId or its replicas
+      if (group.taskIds().contains(taskId)) {
+        return;
+      }
+      for (PartitionIdType partitionId : group.startingSequences.keySet()) {
+        partitionIdToPublishingGroups.computeIfAbsent(partitionId, p -> new 
HashSet<>()).add(group);
+      }
+    });
+
+    if (partitionIdToPublishingGroups.isEmpty()) {
+      return false;
+    }
+
+    // Check if any of the partitions of this taskId are being published by 
other taskGroups
+    for (Object partition : partitions) {
+      @SuppressWarnings("unchecked")
+      final PartitionIdType partitionId = (PartitionIdType) partition;
+      if (partitionIdToPublishingGroups.containsKey(partitionId)) {
+        log.info(
+            "Task[%s] needs to wait before publishing as other taskGroups[%s] 
are currently publishing to partition[%s].",
+            taskId,
+            partitionIdToPublishingGroups.get(partitionId),
+            partitionId
+        );
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+
   @VisibleForTesting
   protected void addDiscoveredTaskToPendingCompletionTaskGroups(
       int groupId,
@@ -2911,7 +2964,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   public int getPartitionCount()
   {
-    return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+    recordSupplierLock.lock();
+    try {
+      return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+    }
+    finally {
+      recordSupplierLock.unlock();
+    }
   }
 
   private boolean updatePartitionDataFromStream()
@@ -3412,6 +3471,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
 
     final AtomicInteger numStoppedTasks = new AtomicInteger();
+    final AtomicBoolean hasTaskRolloverStarted = new AtomicBoolean(false);
     // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing
     activelyReadingTaskGroups.entrySet().stream().sorted(
             Comparator.comparingLong(
@@ -3435,6 +3495,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             if (group.getHandoffEarly()) {
               numStoppedTasks.getAndIncrement();
             }
+            if (stopTasksEarly) {
+              hasTaskRolloverStarted.set(true);
+            }
           } else if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
             // Stop this task group if it has run longer than the configured 
duration
             // and the pending task groups are less than the configured stop 
task count.
@@ -3448,6 +3511,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               futureGroupIds.add(groupId);
               futures.add(checkpointTaskGroup(group, true));
               numStoppedTasks.getAndIncrement();
+              hasTaskRolloverStarted.set(true);
             }
           }
         });
@@ -3503,7 +3567,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       activelyReadingTaskGroups.remove(groupId);
     }
 
-    maybeScaleDuringTaskRollover();
+    // If rollover has started, check if scaling needs to be done
+    if (numStoppedTasks.get() > 0 && hasTaskRolloverStarted.get()) {
+      maybeScaleDuringTaskRollover();
+    }
   }
 
   /**
@@ -3511,27 +3578,27 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * <p>
    * This method is invoked to determine whether a task count adjustment is 
needed
    * during a task rollover based on the recommendations from the task 
auto-scaler.
- */
-
+   */
   @VisibleForTesting
-  void maybeScaleDuringTaskRollover()
+  void maybeScaleDuringTaskRollover() throws ExecutionException, 
InterruptedException
   {
-    if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
-      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
-      if (rolloverTaskCount > 0 && rolloverTaskCount != 
getIoConfig().getTaskCount()) {
-        log.info("Autoscaler recommends scaling down to [%d] tasks during 
rollover", rolloverTaskCount);
-        changeTaskCountInIOConfig(rolloverTaskCount);
-        // Here force reset the supervisor state to be re-calculated on the 
next iteration of runInternal() call.
-        // This seems the best way to inject task amount recalculation during 
the rollover.
-        clearPartitionAssignmentsForScaling();
-
-        ServiceMetricEvent.Builder event = ServiceMetricEvent
-            .builder()
-            .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
-            .setDimension(DruidMetrics.DATASOURCE, dataSource)
-            .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
-
-        emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
rolloverTaskCount));
+    if (taskAutoScaler == null) {
+      // Do nothing
+    } else {
+      log.info("Computing optimal taskCount for supervisor[%s] during 
rollover.", supervisorId);
+      final int currentTaskCount = getCurrentTaskCount();
+      final int rolloverTaskCount = 
taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount != currentTaskCount) {
+        log.info(
+            "Updating taskCount for supervisor[%s] from [%d] to [%d] during 
rollover.",
+            supervisorId, currentTaskCount, rolloverTaskCount
+        );
+        isScalingTasksOnRollover.set(true);
+        new DynamicAllocationTasksNotice(
+            () -> rolloverTaskCount,
+            () -> isScalingTasksOnRollover.set(false),
+            emitter
+        ).handle();
       }
     }
   }
@@ -4825,6 +4892,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  private ServiceMetricEvent.Builder getMetricBuilder()
+  {
+    return ServiceMetricEvent
+        .builder()
+        .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+        .setDimension(DruidMetrics.DATASOURCE, dataSource)
+        .setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
+        .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS));
+  }
+
   protected void emitLag()
   {
     SupervisorStateManager.State basicState = 
stateManager.getSupervisorState().getBasicState();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 9386ac365f8..633bd9b70dc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream.supervisor;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.error.InvalidInput;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
@@ -88,10 +89,12 @@ public abstract class SeekableStreamSupervisorIOConfig
     // Could be null
     this.autoScalerConfig = autoScalerConfig;
     this.autoScalerEnabled = autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler();
-    // if autoscaler is enabled, then taskCount will be ignored here and 
initial taskCount will equal to taskCountStart/taskCountMin
     if (autoScalerEnabled) {
-      final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
-      this.taskCount = startTaskCount != null ? startTaskCount : 
autoScalerConfig.getTaskCountMin();
+      // Priority: taskCountStart > taskCount > taskCountMin
+      this.taskCount = Configs.valueOrDefault(
+          autoScalerConfig.getTaskCountStart(),
+          Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
+      );
     } else {
       this.taskCount = taskCount != null ? taskCount : 1;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9c42a6f35c9..20da7ab7901 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -62,10 +63,17 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   public static final String LAG_COST_METRIC = 
"task/autoScaler/costBased/lagCost";
   public static final String IDLE_COST_METRIC = 
"task/autoScaler/costBased/idleCost";
   public static final String OPTIMAL_TASK_COUNT_METRIC = 
"task/autoScaler/costBased/optimalTaskCount";
+  public static final String INVALID_METRICS_COUNT = 
"task/autoScaler/costBased/invalidMetrics";
 
   static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
   static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
 
+  /**
+   * If average partition lag crosses this value and the processing rate is
+   * still zero, scaling actions are skipped and an alert is raised.
+   */
+  static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
+
   /**
    * Divisor for partition count in the K formula: K = (partitionCount / 
K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
    * This controls how aggressive the scaling is relative to partition count.
@@ -78,7 +86,6 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final ServiceEmitter emitter;
   private final SupervisorSpec spec;
   private final CostBasedAutoScalerConfig config;
-  private final ServiceMetricEvent.Builder metricBuilder;
   private final ScheduledExecutorService autoscalerExecutor;
   private final WeightedCostFunction costFunction;
   private volatile CostMetrics lastKnownMetrics;
@@ -101,7 +108,11 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     this.costFunction = new WeightedCostFunction();
     this.autoscalerExecutor = 
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
                                                             + 
StringUtils.encodeForFormat(spec.getId()));
-    this.metricBuilder =
+  }
+
+  private ServiceMetricEvent.Builder getMetricBuilder()
+  {
+    return
         ServiceMetricEvent.builder()
                           .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
                           .setDimension(
@@ -161,29 +172,26 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   public int computeTaskCountForScaleAction()
   {
     lastKnownMetrics = collectMetrics();
-    if (lastKnownMetrics == null) {
-      log.debug("Metrics not available for supervisorId [%s], skipping scaling 
action", supervisorId);
-      return -1;
-    }
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
+    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
 
     // Perform scale-up actions; scale-down actions only if configured.
-    int taskCount = -1;
+    final int taskCount;
     if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
       lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
-      log.info("New task count [%d] on supervisor [%s], scaling up", 
taskCount, supervisorId);
+      log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).", supervisorId, currentTaskCount, taskCount);
     } else if (!config.isScaleDownOnTaskRolloverOnly()
                && isScaleActionAllowed()
                && optimalTaskCount < currentTaskCount
                && optimalTaskCount > 0) {
       taskCount = optimalTaskCount;
       lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
-      log.info("New task count [%d] on supervisor [%s], scaling down", 
taskCount, supervisorId);
+      log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
down).", supervisorId, currentTaskCount, taskCount);
     } else {
-      log.info("No scaling required for supervisor [%s]", supervisorId);
+      taskCount = -1;
+      log.debug("No scaling required for supervisor[%s]", supervisorId);
     }
     return taskCount;
   }
@@ -206,8 +214,14 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
    */
   int computeOptimalTaskCount(CostMetrics metrics)
   {
-    if (metrics == null) {
-      log.debug("No metrics available yet for supervisorId [%s]", 
supervisorId);
+    final Either<String, Boolean> result = validateMetricsForScaling(metrics);
+    if (result.isError()) {
+      log.debug("Valid metrics are not yet available for scaling 
supervisor[%s]", supervisorId);
+      emitter.emit(
+          getMetricBuilder()
+              .setDimension(DruidMetrics.DESCRIPTION, result.error())
+              .setMetric(INVALID_METRICS_COUNT, 1L)
+      );
       return -1;
     }
 
@@ -228,55 +242,52 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     );
 
     if (validTaskCounts.length == 0) {
-      log.warn("No valid task counts after applying constraints for 
supervisorId [%s]", supervisorId);
+      log.warn("No valid task counts after applying constraints for 
supervisor[%s]", supervisorId);
       return -1;
     }
 
-    int optimalTaskCount = -1;
-    CostResult optimalCost = new CostResult();
+    // Start with the current task count as optimal
+    int optimalTaskCount = currentTaskCount;
+    CostResult optimalCost = costFunction.computeCost(metrics, 
currentTaskCount, config);
 
     log.info(
-        "Current metrics: avgPartitionLag[%.1f], pollIdleRatio[%.1f], 
lagWeight[%.1f], idleWeight[%.1f]",
-        metrics.getAggregateLag(),
+        "Computing optimal taskCount for supervisor[%s] with metrics:"
+        + " currentTaskCount[%d], avgPartitionLag[%.1f], 
avgProcessingRate[%.1f],"
+        + " pollIdleRatio[%.1f], lagWeight[%.1f], idleWeight[%.1f].",
+        supervisorId,
+        currentTaskCount,
+        metrics.getAvgPartitionLag(),
+        metrics.getAvgProcessingRate(),
         metrics.getPollIdleRatio(),
         config.getLagWeight(),
         config.getIdleWeight()
     );
 
-    for (int taskCount : validTaskCounts) {
+    // Find the task count which reduces cost
+    final CostResult[] costResults = new CostResult[validTaskCounts.length];
+    for (int i = 0; i < validTaskCounts.length; ++i) {
+      final int taskCount = validTaskCounts[i];
       CostResult costResult = costFunction.computeCost(metrics, taskCount, 
config);
       double cost = costResult.totalCost();
 
-      log.info(
-          "Proposed task count[%d] has total cost[%.4f] = lagCost[%.4f] + 
idleCost[%.4f].",
-          taskCount,
-          cost,
-          costResult.lagCost(),
-          costResult.idleCost()
-      );
+      costResults[i] = costResult;
       if (cost < optimalCost.totalCost()) {
         optimalTaskCount = taskCount;
         optimalCost = costResult;
       }
     }
 
-    emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long) 
optimalTaskCount));
-    emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC, 
optimalCost.lagCost()));
-    emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC, 
optimalCost.idleCost()));
-
-    log.debug(
-        "Cost-based scaling evaluation for supervisorId [%s]: current=%d, 
optimal=%d, cost=%.4f, "
-        + "avgPartitionLag=%.2f, pollIdleRatio=%.3f",
-        supervisorId,
-        metrics.getCurrentTaskCount(),
-        optimalTaskCount,
-        optimalCost.totalCost(),
-        metrics.getAvgPartitionLag(),
-        metrics.getPollIdleRatio()
-    );
+    emitter.emit(getMetricBuilder().setMetric(OPTIMAL_TASK_COUNT_METRIC, 
(long) optimalTaskCount));
+    emitter.emit(getMetricBuilder().setMetric(LAG_COST_METRIC, 
optimalCost.lagCost()));
+    emitter.emit(getMetricBuilder().setMetric(IDLE_COST_METRIC, 
optimalCost.idleCost()));
 
     if (optimalTaskCount == currentTaskCount) {
       return -1;
+    } else {
+      log.info(
+          "Optimal taskCount[%d] for supervisor[%s] has lowest cost[%.4f] out 
of the following candidates: %n%s",
+          optimalTaskCount, supervisorId, optimalCost.totalCost(), 
constructCostTable(validTaskCounts, costResults)
+      );
     }
 
     // Scale up is performed eagerly.
@@ -474,9 +485,12 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     }
 
     final LagStats lagStats = supervisor.computeLagStats();
+    final double avgPartitionLag;
     if (lagStats == null) {
       log.debug("Lag stats unavailable for supervisorId [%s], skipping 
collection", supervisorId);
-      return null;
+      avgPartitionLag = -1;
+    } else {
+      avgPartitionLag = lagStats.getAvgLag();
     }
 
     final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
@@ -484,12 +498,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
     final double movingAvgRate = extractMovingAverage(taskStats);
-    // If moving average is not available, we stop scaling effort.
-    if (movingAvgRate < 0) {
-      return null;
-    }
     final double pollIdleRatio = extractPollIdleRatio(taskStats);
-    final double avgPartitionLag = lagStats.getAvgLag();
 
     return new CostMetrics(
         avgPartitionLag,
@@ -501,6 +510,56 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     );
   }
 
+  private static String constructCostTable(int[] taskCounts, CostResult[] 
results)
+  {
+    final StringBuilder table = new StringBuilder();
+    table.append("Task count ");
+    for (int taskCount : taskCounts) {
+      table.append(StringUtils.format("| %8d", taskCount));
+    }
+    table.append("\nLag cost   ");
+    for (CostResult result : results) {
+      table.append(StringUtils.format("| %8.1f", result.lagCost()));
+    }
+    table.append("\nIdle cost  ");
+    for (CostResult result : results) {
+      table.append(StringUtils.format("| %8.1f", result.idleCost()));
+    }
+    table.append("\nTotal cost ");
+    for (CostResult result : results) {
+      table.append(StringUtils.format("| %8.1f", result.totalCost()));
+    }
+
+    return table.toString();
+  }
+
+  /**
+   * Checks if the given metrics are valid for auto-scaling. If they are not
+   * valid, auto-scaling will be skipped until fresh metrics are available.
+   *
+   * @return Either an error or a success boolean.
+   */
+  private Either<String, Boolean> validateMetricsForScaling(CostMetrics 
metrics)
+  {
+    if (metrics == null) {
+      return Either.error("No metrics collected");
+    } else if (metrics.getAvgProcessingRate() < 0 || 
metrics.getPollIdleRatio() < 0) {
+      return Either.error("Task metrics not available");
+    } else if (metrics.getCurrentTaskCount() <= 0 || 
metrics.getPartitionCount() <= 0) {
+      return Either.error("Supervisor metrics not available");
+    } else if (metrics.getAvgPartitionLag() < 0) {
+      return Either.error("Lag metrics not available");
+    } else if (metrics.getAvgProcessingRate() < 1 && 
metrics.getAvgPartitionLag() > MAX_IDLENESS_PARTITION_LAG) {
+      log.makeAlert(
+          "Supervisor[%s] has high partition lag[%.0f] but processing rate is 
zero. Check if the tasks are stuck.",
+          supervisorId, metrics.getAvgPartitionLag()
+      ).emit();
+      return Either.error("Lag is high but processing rate is zero");
+    } else {
+      return Either.value(true);
+    }
+  }
+
   /**
    * Determines if a scale action is currently allowed based on the elapsed 
time
    * since the last scale action and the configured minimum scale-down delay.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
index 8204ff0b6e5..8938ad11b96 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
@@ -34,7 +34,7 @@ public class CostMetrics
   private final double pollIdleRatio;
   private final long taskDurationSeconds;
   private final double avgProcessingRate;
-  private double aggregateLag;
+  private final double aggregateLag;
 
   public CostMetrics(
       double avgPartitionLag,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
index 42096dd61e1..1ad946ff517 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -25,16 +25,10 @@ package 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
  */
 public class CostResult
 {
-
   private final double totalCost;
   private final double lagCost;
   private final double idleCost;
 
-  public CostResult()
-  {
-    this(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY);
-  }
-
   /**
    * @param totalCost the weighted sum of lagCost and idleCost
    * @param lagCost   the weighted cost representing expected time (seconds) 
to recover current lag
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 0f5ea6083c7..8fe4f9d8d06 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.logger.Logger;
 
 /**
@@ -35,6 +36,13 @@ public class WeightedCostFunction
    */
   static final double LAG_AMPLIFICATION_MULTIPLIER = 0.05;
 
+  /**
+   * Minimum rate of processing for any task in records per second. This is 
used
+   * as a placeholder if avg rate is not available to ensure that cost 
computations
+   * do not return infinitely large lag recovery times.
+   */
+  static final double MIN_PROCESSING_RATE = 1_000;
+
   /**
    * Computes cost for a given task count using compute time metrics.
    * <p>
@@ -62,14 +70,8 @@ public class WeightedCostFunction
 
     final double avgProcessingRate = metrics.getAvgProcessingRate();
     final double lagRecoveryTime;
-    if (avgProcessingRate <= 0) {
-      // Metrics are unavailable - favor maintaining the current task count.
-      // We're conservative about scale up, but won't let an unlikey scale 
down to happen.
-      if (proposedTaskCount == metrics.getCurrentTaskCount()) {
-        return new CostResult(0.01d, 0.0, 0.0);
-      } else {
-        return new CostResult(Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
-      }
+    if (avgProcessingRate < 0) {
+      throw DruidException.defensive("Avg processing rate[%.2f] must not be 
negative.", avgProcessingRate);
     } else {
       // Lag recovery time is decreasing by adding tasks and increasing by 
ejecting tasks.
       // In case of increasing lag, we apply an amplification factor to 
reflect the urgency of addressing lag.
@@ -79,7 +81,8 @@ public class WeightedCostFunction
       } else {
         final double lagPerPartition = metrics.getAggregateLag() / 
metrics.getPartitionCount();
         final double amplification = Math.max(1.0, 1.0 + 
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
-        lagRecoveryTime = metrics.getAggregateLag() * amplification / 
(proposedTaskCount * avgProcessingRate);
+        final double adjustedProcessingRate = Math.max(avgProcessingRate, 
MIN_PROCESSING_RATE);
+        lagRecoveryTime = metrics.getAggregateLag() * amplification / 
(proposedTaskCount * adjustedProcessingRate);
       }
     }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index a64ca5a6b6b..4377452ec5d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -203,7 +203,7 @@ public class SegmentTransactionalInsertActionTest
     );
 
     Assert.assertEquals(
-        SegmentPublishResult.retryableFailure(
+        SegmentPublishResult.fail(
             "The new start metadata state[ObjectMetadata{theObject=[1]}] is"
             + " ahead of the last committed end state[null]. Try resetting the 
supervisor."
         ),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
index b39e6097903..0bf697a2a09 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -23,11 +23,14 @@ import com.google.common.base.Optional;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import org.apache.druid.query.DruidMetrics;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.List;
+
 public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends 
SeekableStreamSupervisorTestBase
 {
   private static final int DEFAULT_TASK_COUNT = 10;
@@ -45,7 +48,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
   }
 
   @Test
-  public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
+  public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() 
throws Exception
   {
     // Given
     setupSpecExpectations(createIOConfig(DEFAULT_TASK_COUNT, null));
@@ -65,7 +68,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
   }
 
   @Test
-  public void 
test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale()
+  public void 
test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale() 
throws Exception
   {
     // Given
     setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
@@ -93,7 +96,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
   }
 
   @Test
-  public void 
test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling()
+  public void 
test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling() 
throws Exception
   {
     // Given
     final int targetTaskCount = 5;
@@ -102,6 +105,9 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
             .andReturn(createMockAutoScaler(targetTaskCount))
             .anyTimes();
+    EasyMock.expect(spec.getContextValue(EasyMock.eq(DruidMetrics.TAGS)))
+            .andReturn(List.of("tag1", "tag2"))
+            .anyTimes();
     EasyMock.replay(spec);
 
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(100);
@@ -121,7 +127,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
   }
 
   @Test
-  public void 
test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale()
+  public void 
test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() throws 
Exception
   {
     // Given
     setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index db37e3014e3..3f534de8068 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2760,7 +2760,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         "stream",
         null,
         1,
-        99,
+        null,
         new Period("PT1H"),
         new Period("PT1S"),
         new Period("PT30S"),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 45c1c2b772e..7b46c4c6558 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -32,6 +32,7 @@ import org.mockito.Mockito;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -372,6 +373,10 @@ public class CostBasedAutoScalerMockTest
         AVG_PROCESSING_RATE
     );
     doReturn(metrics).when(autoScaler).collectMetrics();
+
+    SeekableStreamSupervisorIOConfig ioConfig = 
mock(SeekableStreamSupervisorIOConfig.class);
+    doReturn(ioConfig).when(mockSupervisor).getIoConfig();
+    doReturn(taskCount).when(ioConfig).getTaskCount();
   }
 
   private CostMetrics createMetrics(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 86504458b75..0eea53b16ed 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -484,6 +484,7 @@ public class CostBasedAutoScalerTest
     when(spec.isSuspended()).thenReturn(false);
     when(supervisor.getIoConfig()).thenReturn(ioConfig);
     when(ioConfig.getStream()).thenReturn("test-stream");
+    when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
     when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
     // No task stats means the moving average rate is unavailable
     when(supervisor.getStats()).thenReturn(Collections.emptyMap());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index c4133c0093b..f96df30049e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
+import org.apache.druid.error.DruidException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -156,20 +157,26 @@ public class WeightedCostFunctionTest
   }
 
   @Test
-  public void testNoProcessingRateFavorsCurrentTaskCount()
+  public void testZeroProcessingRateUsesDefaultRate()
   {
-    // When the processing rate is unavailable (0), the cost function should 
favor
-    // maintaining the current task count, rather to scale up decisions with 
incomplete data.
+    // When the processing rate is zero, the cost function uses a default rate 
and tries to recover lag
     int currentTaskCount = 10;
-    CostMetrics metricsNoRate = createMetricsWithRate(50000.0, 
currentTaskCount, 100, 0.3, 0.0);
-
+    CostMetrics metricsNoRate = createMetricsWithRate(50000.0, 
currentTaskCount, 100, 1.0, 0.0);
+
+    final CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig
+        .builder()
+        .taskCountMin(1)
+        .taskCountMax(100)
+        .idleWeight(0)
+        .lagWeight(1)
+        .build();
     double costAtCurrent = costFunction.computeCost(metricsNoRate, 
currentTaskCount, config).totalCost();
     double costScaleUp = costFunction.computeCost(metricsNoRate, 
currentTaskCount + 5, config).totalCost();
     double costScaleDown = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, config).totalCost();
 
     Assert.assertTrue(
         "Cost at current should be less than cost for scale up",
-        costAtCurrent < costScaleUp
+        costAtCurrent > costScaleUp
     );
     Assert.assertTrue(
         "Cost at current should be less than cost for scale down",
@@ -178,29 +185,12 @@ public class WeightedCostFunctionTest
   }
 
   @Test
-  public void testNoProcessingRateDeviationPenaltyIsSymmetric()
+  public void testNegativeProcessingRate_throwsDefensiveException()
   {
-    // Deviation penalty should be symmetric around current task count
-    int currentTaskCount = 10;
-    CostMetrics metricsNoRate = createMetricsWithRate(50000.0, 
currentTaskCount, 100, 0.5, 0.0);
-
-    // Use lag-only config to isolate the lag recovery time component
-    CostBasedAutoScalerConfig lagOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(100)
-                                                                       
.taskCountMin(1)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.lagWeight(1.0)
-                                                                       
.idleWeight(0.0)
-                                                                       
.build();
-
-    double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount 
+ 5, lagOnlyConfig).totalCost();
-    double costDown5 = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, lagOnlyConfig).totalCost();
-
-    Assert.assertEquals(
-        "Lag cost for +5 and -5 deviation should be equal",
-        costUp5,
-        costDown5,
-        0.001
+    final CostMetrics metrics = createMetricsWithRate(50000.0, 1, 100, 0.5, 
-1);
+    Assert.assertThrows(
+        DruidException.class,
+        () -> costFunction.computeCost(metrics, 2, config)
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index eb84b50399a..74e65f1228b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -33,8 +33,12 @@ import java.util.function.Function;
 
 public abstract class TransactionalSegmentPublisher
 {
-  private static final int QUIET_RETRIES = 3;
-  private static final int MAX_RETRIES = 5;
+  private static final int QUIET_RETRIES = 5;
+
+  /**
+   * Approximately 10 minutes of retrying using {@link 
RetryUtils#nextRetrySleepMillis(int)}.
+   */
+  private static final int MAX_RETRIES = 13;
 
   /**
    * Publish segments, along with some commit metadata, in a single 
transaction.
@@ -104,7 +108,7 @@ public abstract class TransactionalSegmentPublisher
   /**
    * Sleeps until the next attempt.
    */
-  private static void awaitNextRetry(SegmentPublishResult lastResult, int 
attemptCount)
+  void awaitNextRetry(SegmentPublishResult lastResult, int attemptCount)
   {
     try {
       RetryUtils.awaitNextRetry(
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
index 884b475893d..c4eec7106a7 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
@@ -46,7 +46,7 @@ public class TransactionalSegmentPublisherTest
         SegmentPublishResult.retryableFailure("this error is retryable"),
         publisher.publishSegments(null, Set.of(), Function.identity(), null, 
null)
     );
-    Assert.assertEquals(6, attemptCount.get());
+    Assert.assertEquals(14, attemptCount.get());
   }
 
   @Test
@@ -99,6 +99,12 @@ public class TransactionalSegmentPublisherTest
         attemptCount.incrementAndGet();
         return publishResult;
       }
+
+      @Override
+      void awaitNextRetry(SegmentPublishResult lastResult, int attemptCount)
+      {
+        // return immediately
+      }
     };
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to