This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0920a935591 fix: Precedence of taskCount, taskCountStart, 
taskCountMin. (#19417)
0920a935591 is described below

commit 0920a93559116bfd401618d56854b1c0d5a31f14
Author: Gian Merlino <[email protected]>
AuthorDate: Wed May 6 09:49:07 2026 -0700

    fix: Precedence of taskCount, taskCountStart, taskCountMin. (#19417)
    
    PR #18745 included a discussion of desired behavior of taskCount,
    taskCountStart, and taskCountMin, but this has not been fully
    implemented. This patch updates logic to align with the intent.
---
 docs/ingestion/supervisor.md                       |   2 +-
 .../supervisor/KafkaSupervisorIOConfigTest.java    |  47 +++++++-
 .../kafka/supervisor/KafkaSupervisorSpecTest.java  |  43 +++++++
 .../overlord/supervisor/SupervisorManager.java     |   4 +-
 .../SeekableStreamSupervisorIOConfig.java          |  34 ++++--
 .../supervisor/SeekableStreamSupervisorSpec.java   |  46 ++++----
 .../supervisor/SupervisorResourceTest.java         |  97 ++++++----------
 .../SeekableStreamSupervisorIOConfigTest.java      |  73 ++++++------
 .../SeekableStreamSupervisorSpecTest.java          | 124 ++++++++++++---------
 .../SeekableStreamSupervisorStateTest.java         |  12 +-
 .../SeekableStreamSupervisorTestBase.java          |  20 +++-
 .../overlord/supervisor/SupervisorSpec.java        |   6 +-
 12 files changed, 307 insertions(+), 201 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 4e59f0fa542..19665242081 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -78,7 +78,7 @@ The following table outlines the configuration properties for 
`autoScalerConfig`
 |`enableTaskAutoScaler`|Enables the autoscaler. If not specified, Druid 
disables the autoscaler even when `autoScalerConfig` is not null.|No|`false`|
 |`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or 
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka 
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to 
the number of Kafka partitions or Kinesis shards and ignores 
`taskCountMax`.|Yes||
 |`taskCountMin`|The minimum number of ingestion tasks. When you enable the 
autoscaler, Druid computes the initial number of tasks to launch by checking 
the configs in the following order: `taskCountStart`, then `taskCount` (in 
`ioConfig`), then `taskCountMin`.|Yes||
-|`taskCountStart`|Optional config to specify the number of ingestion tasks to 
start with. When you enable the autoscaler, Druid computes the initial number 
of tasks to launch by checking the configs in the following order: 
`taskCountStart`, then `taskCount` (in `ioConfig`), then 
`taskCountMin`.|No|`taskCount` or `taskCountMin`|
+|`taskCountStart`|Optional config to specify the number of ingestion tasks to 
start with. If `taskCountStart` is provided on POST of a supervisor, it takes 
priority and the `taskCount` is reset to `taskCountStart` at that 
time.|No|`taskCount` or `taskCountMin`|
 |`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, 
specified as an ISO-8601 duration string. Falls back to 
`minTriggerScaleActionFrequencyMillis` if not set.|No||
 |`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, 
specified as an ISO-8601 duration string. Falls back to 
`minTriggerScaleActionFrequencyMillis` if not set.|No||
 |`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` 
and `minScaleDownDelay` instead. Minimum time interval in milliseconds between 
scale actions, used as the fallback when the Duration-based fields are not 
set.|No|600000|
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 289d3c989f3..264c37f9913 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -381,7 +381,7 @@ public class KafkaSupervisorIOConfigTest
         false,
         null
     );
-    Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
+    Assert.assertEquals(1, kafkaSupervisorIOConfig.getTaskCount());
 
     Assert.assertThrows(
         "taskCountMin <= taskCountStart <= taskCountMax",
@@ -400,6 +400,51 @@ public class KafkaSupervisorIOConfigTest
     );
   }
 
+  @Test
+  public void testTaskCountStartFallbackAndExplicitFlag()
+  {
+    final Map<String, Object> autoScalerConfig = ImmutableMap.of(
+        "enableTaskAutoScaler", true,
+        "taskCountMin", 1,
+        "taskCountMax", 10,
+        "taskCountStart", 5
+    );
+
+    Assert.assertEquals(7, makeIOConfig(7, autoScalerConfig).getTaskCount());
+    Assert.assertTrue(makeIOConfig(7, autoScalerConfig).isTaskCountExplicit());
+
+    Assert.assertEquals(5, makeIOConfig(null, 
autoScalerConfig).getTaskCount());
+    Assert.assertFalse(makeIOConfig(null, 
autoScalerConfig).isTaskCountExplicit());
+  }
+
+  private KafkaSupervisorIOConfig makeIOConfig(Integer taskCount, Map<String, 
Object> autoScalerConfig)
+  {
+    return new KafkaSupervisorIOConfig(
+        "test",
+        null,
+        null,
+        1,
+        taskCount,
+        new Period("PT1H"),
+        ImmutableMap.of("bootstrap.servers", "localhost:8082"),
+        mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+        LagAggregator.DEFAULT,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        true,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        false,
+        null
+    );
+  }
+
   @Test
   public void testIdleConfigSerde() throws JsonProcessingException
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 4f6088a8868..8879ff6d975 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -30,7 +30,10 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -422,6 +425,46 @@ public class KafkaSupervisorSpecTest
     Assert.assertFalse(runningSpec.isSuspended());
   }
 
+  @Test
+  public void testTaskCountSerdeRoundTrip() throws IOException
+  {
+    // A persisted taskCount must survive a serialize/deserialize round-trip 
even when
+    // autoScalerConfig.taskCountStart is set.
+    final CostBasedAutoScalerConfig autoScalerConfig =
+        CostBasedAutoScalerConfig.builder()
+            .enableTaskAutoScaler(true)
+            .taskCountMin(1)
+            .taskCountMax(100)
+            .taskCountStart(25)
+            .build();
+
+    final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(TimestampSpec.DEFAULT)
+                .withAggregators(new CountAggregatorFactory("rows"))
+                .withGranularity(new UniformGranularitySpec(Granularities.DAY, 
Granularities.NONE, null))
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(Map.of("bootstrap.servers", 
"localhost:9092"))
+                .withTaskCount(25)
+                .withAutoScalerConfig(autoScalerConfig)
+                .withLagAggregator(LagAggregator.DEFAULT)
+        )
+        .build("testDs", "metrics");
+
+    // Mutate taskCount the same way 
SeekableStreamSupervisor.changeTaskCountInIOConfig does,
+    // and verify that the mutation is picked up by serialization.
+    spec.getIoConfig().setTaskCount(50);
+    final byte[] payload = mapper.writeValueAsBytes(spec);
+    final KafkaSupervisorSpec roundTripped =
+        (KafkaSupervisorSpec) mapper.readValue(payload, SupervisorSpec.class);
+    Assert.assertEquals(50, roundTripped.getIoConfig().getTaskCount());
+    Assert.assertTrue(roundTripped.getIoConfig().isTaskCountExplicit());
+  }
+
   @Test
   public void test_validateSpecUpdateTo()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 51abb814a6d..52f3cba7fc1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -208,9 +208,7 @@ public class SupervisorManager implements 
SupervisorStatsProvider
       Preconditions.checkState(started, "SupervisorManager not started");
       final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
       SupervisorSpec existingSpec = 
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
-      if (existingSpec != null) {
-        spec.merge(existingSpec);
-      }
+      spec.merge(existingSpec);
       createAndStartSupervisorInternal(spec, shouldUpdateSpec);
       return shouldUpdateSpec;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 421a885b294..cf7b27a6f0c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -19,12 +19,14 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.joda.time.DateTime;
@@ -41,7 +43,14 @@ public abstract class SeekableStreamSupervisorIOConfig
   @Nullable
   private final InputFormat inputFormat; // nullable for backward compatibility
   private final Integer replicas;
-  private Integer taskCount;
+  private int taskCount;
+  /**
+   * Whether {@link #taskCount} was explicitly provided to the constructor. 
After a serde round-trip,
+   * this will always be false, because the constructor creates an explicit 
taskCount. Its purpose is
+   * to allow {@link SeekableStreamSupervisorSpec#merge(SupervisorSpec)} to 
tell if a user-submitted
+   * spec had an explicit taskCount or not.
+   */
+  private final boolean taskCountExplicit;
   private final Duration taskDuration;
   private final Duration startDelay;
   private final Duration period;
@@ -90,16 +99,19 @@ public abstract class SeekableStreamSupervisorIOConfig
     this.autoScalerConfig = autoScalerConfig;
     boolean isAutoScalerAvailable = autoScalerConfig != null;
     this.autoScalerEnabled = isAutoScalerAvailable && 
autoScalerConfig.getEnableTaskAutoScaler();
-    if (autoScalerEnabled) {
-      // Priority: taskCountStart > taskCount > taskCountMin
+    this.taskCountExplicit = taskCount != null;
+    if (taskCount != null) {
+      // Always retain taskCount when deserializing. Note: taskCountStart 
takes precedence over taskCount
+      // in SeekableStreamSupervisorSpec#merge, to ensure that when a 
supervisor is explicitly POSTed, taskCount
+      // is reset to taskCountStart.
+      this.taskCount = taskCount;
+    } else if (autoScalerEnabled) {
       this.taskCount = Configs.valueOrDefault(
           autoScalerConfig.getTaskCountStart(),
-          Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
+          autoScalerConfig.getTaskCountMin()
       );
-    } else if (isAutoScalerAvailable) {
-      this.taskCount = Configs.valueOrDefault(taskCount, 
autoScalerConfig.getTaskCountMin());
     } else {
-      this.taskCount = Configs.valueOrDefault(taskCount, 1);
+      this.taskCount = 1;
     }
     Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
                                 "stopTaskCount must be greater than 0");
@@ -204,7 +216,7 @@ public abstract class SeekableStreamSupervisorIOConfig
   }
 
   @JsonProperty
-  public Integer getTaskCount()
+  public int getTaskCount()
   {
     return taskCount;
   }
@@ -214,6 +226,12 @@ public abstract class SeekableStreamSupervisorIOConfig
     this.taskCount = taskCount;
   }
 
+  @JsonIgnore
+  public boolean isTaskCountExplicit()
+  {
+    return taskCountExplicit;
+  }
+
   @JsonProperty
   public Duration getTaskDuration()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index f21e073f6c4..fefa24a5f3f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -43,7 +43,6 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 
 import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
 import java.util.List;
 import java.util.Map;
 
@@ -261,30 +260,37 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
     }
   }
 
+  /**
+   * Updates {@link SeekableStreamSupervisorIOConfig#getTaskCount()} on this 
user-submitted spec
+   * to the desired value. The rules applied are:
+   *
+   * <ol>
+   *   <li>If {@code taskCountStart} is set on this user-submitted spec, use 
it.</li>
+   *   <li>Otherwise, if {@code taskCount} is set on this user-submitted spec, 
use it.</li>
+   *   <li>Otherwise, use the existing spec's {@code taskCount}.</li>
+   * </ol>
+   */
   @Override
-  public void merge(@NotNull SupervisorSpec existingSpec)
+  public void merge(@Nullable SupervisorSpec existingSpec)
   {
-    AutoScalerConfig thisAutoScalerConfig = 
this.getIoConfig().getAutoScalerConfig();
-    // Either if autoscaler is absent or taskCountStart is specified - just 
return.
-    if (thisAutoScalerConfig == null || 
thisAutoScalerConfig.getTaskCountStart() != null) {
+    // Use this spec's taskCountStart if set.
+    final AutoScalerConfig thisAutoScalerConfig = 
getIoConfig().getAutoScalerConfig();
+    if (thisAutoScalerConfig != null
+        && thisAutoScalerConfig.getEnableTaskAutoScaler()
+        && thisAutoScalerConfig.getTaskCountStart() != null) {
+      getIoConfig().setTaskCount(thisAutoScalerConfig.getTaskCountStart());
+      return;
+    }
+
+    // Use this spec's taskCount if set.
+    if (getIoConfig().isTaskCountExplicit()) {
       return;
     }
 
-    // Use a switch expression with pattern matching when we move to Java 21 
as a minimum requirement.
-    if (existingSpec instanceof SeekableStreamSupervisorSpec) {
-      SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) 
existingSpec;
-      AutoScalerConfig autoScalerConfig = 
spec.getIoConfig().getAutoScalerConfig();
-      if (autoScalerConfig == null) {
-        return;
-      }
-      // provided `taskCountStart` > provided `taskCount` > existing 
`taskCount` > provided `taskCountMin`.
-      int taskCount = thisAutoScalerConfig.getTaskCountMin();
-      if (this.getIoConfig().getTaskCount() != null) {
-        taskCount = this.getIoConfig().getTaskCount();
-      } else if (spec.getIoConfig().getTaskCount() != null) {
-        taskCount = spec.getIoConfig().getTaskCount();
-      }
-      this.getIoConfig().setTaskCount(taskCount);
+    // Use the existing spec's taskCount. If it isn't there, we'll fall back 
to this spec's taskCount. Because there's
+    // no taskCountStart (and taskCount hasn't been explicitly set) this 
spec's taskCount will be taskCountMin or 1.
+    if (existingSpec instanceof SeekableStreamSupervisorSpec 
existingSeekableStreamSpec) {
+      
getIoConfig().setTaskCount(existingSeekableStreamSpec.getIoConfig().getTaskCount());
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index bb7581ee874..4ccf4659994 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -37,7 +37,7 @@ import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMeta
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
-import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTestBase;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -1394,36 +1394,44 @@ public class SupervisorResourceTest extends 
EasyMockSupport
   }
 
   @Test
-  public void 
testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed()
+  public void testSpecPostMergeUsesExistingTaskCountWhenNewSpecHasNone()
   {
-    // New spec has no taskCount -> should use existing taskCount (5)
-    TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
-    TestSeekableStreamSupervisorSpec newSpec = 
createTestSpecWithExpectedMerge(null, 2, 5);
+    // New spec has no taskCount -> should carry forward existing taskCount 
(5).
+    final TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+    final TestSeekableStreamSupervisorSpec newSpec = createTestSpec(null, 2);
 
     newSpec.merge(existingSpec);
-    EasyMock.verify(newSpec.getIoConfig());
+
+    Assert.assertEquals(5, newSpec.getIoConfig().getTaskCount());
   }
 
   @Test
   public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount()
   {
-    // New spec has taskCount=3 -> should use provided taskCount over existing 
(5)
-    TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
-    TestSeekableStreamSupervisorSpec newSpec = 
createTestSpecWithExpectedMerge(3, 2, 3);
+    // New spec has taskCount=3 -> should keep provided taskCount over 
existing (5).
+    final TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+    final TestSeekableStreamSupervisorSpec newSpec = createTestSpec(3, 2);
 
     newSpec.merge(existingSpec);
-    EasyMock.verify(newSpec.getIoConfig());
+
+    Assert.assertEquals(3, newSpec.getIoConfig().getTaskCount());
   }
 
   @Test
-  public void testSpecPostMergeFallsBackToProvidedTaskCountMin()
+  public void 
testSpecPostMergeCarriesForwardEvenWhenExistingHasOnlyTaskCountMin()
   {
-    // Neither has taskCount -> should fall back to taskCountMin (4)
-    TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 1);
-    TestSeekableStreamSupervisorSpec newSpec = 
createTestSpecWithExpectedMerge(null, 4, 4);
+    // existingSpec has taskCount = 1, newSpec has no taskCount and 
taskCountMin = 4
+    //   -> carry forward existing taskCount, keep it at 1. We expect the 
autoscaler
+    //      to set the taskCount to the new min when it runs.
+    final TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 
1);
+    final TestSeekableStreamSupervisorSpec newSpec = createTestSpec(null, 4);
+
+    Assert.assertEquals(1, existingSpec.getIoConfig().getTaskCount());
+    Assert.assertEquals(4, newSpec.getIoConfig().getTaskCount());
 
     newSpec.merge(existingSpec);
-    EasyMock.verify(newSpec.getIoConfig());
+
+    Assert.assertEquals(1, newSpec.getIoConfig().getTaskCount());
   }
 
   @Test
@@ -1450,60 +1458,19 @@ public class SupervisorResourceTest extends 
EasyMockSupport
 
   private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount, 
int taskCountMin)
   {
-    HashMap<String, Object> autoScalerConfig = new HashMap<>();
-    autoScalerConfig.put("enableTaskAutoScaler", true);
-    autoScalerConfig.put("taskCountMax", 10);
-    autoScalerConfig.put("taskCountMin", taskCountMin);
-
-    SeekableStreamSupervisorIOConfig ioConfig = 
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
-    EasyMock.expect(ioConfig.getAutoScalerConfig())
-            .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
-            .anyTimes();
-    EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
-    EasyMock.replay(ioConfig);
-
-    DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
-    
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
-    EasyMock.replay(dataSchema);
-
-    SeekableStreamSupervisorIngestionSpec ingestionSchema =
-        EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
-    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
-    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-    EasyMock.replay(ingestionSchema);
-
-    return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
-  }
-
-  private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge(
-      Integer taskCount,
-      int taskCountMin,
-      int expectedTaskCount
-  )
-  {
-    HashMap<String, Object> autoScalerConfig = new HashMap<>();
-    autoScalerConfig.put("enableTaskAutoScaler", true);
-    autoScalerConfig.put("taskCountMax", 10);
-    autoScalerConfig.put("taskCountMin", taskCountMin);
-
-    SeekableStreamSupervisorIOConfig ioConfig = 
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
-    EasyMock.expect(ioConfig.getAutoScalerConfig())
-            .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
-            .anyTimes();
-    EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
-    ioConfig.setTaskCount(expectedTaskCount);
-    EasyMock.expectLastCall().once();
-    EasyMock.replay(ioConfig);
+    final SeekableStreamSupervisorIOConfig ioConfig = 
SeekableStreamSupervisorTestBase.createIOConfig(
+        taskCount,
+        
SeekableStreamSupervisorTestBase.lagBasedAutoScalerConfig(taskCountMin, 10, 
null)
+    );
 
-    DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
+    final DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
     
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
     EasyMock.replay(dataSchema);
 
-    SeekableStreamSupervisorIngestionSpec ingestionSchema =
-        EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
-    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
-    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-    EasyMock.replay(ingestionSchema);
+    final SeekableStreamSupervisorIngestionSpec ingestionSchema =
+        new SeekableStreamSupervisorIngestionSpec(dataSchema, ioConfig, null)
+        {
+        };
 
     return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
index 3974b9bebca..09f8bfde484 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
@@ -77,7 +77,7 @@ public class SeekableStreamSupervisorIOConfigTest
     Assert.assertEquals("stream", config.getStream());
     Assert.assertEquals(inputFormat, config.getInputFormat());
     Assert.assertEquals(Integer.valueOf(1), config.getReplicas());
-    Assert.assertEquals(Integer.valueOf(1), config.getTaskCount());
+    Assert.assertEquals(1, config.getTaskCount());
     Assert.assertEquals(Duration.standardHours(1), config.getTaskDuration());
     Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
@@ -94,47 +94,45 @@ public class SeekableStreamSupervisorIOConfigTest
   }
 
   @Test
-  public void testAutoScalerEnabledPreservesTaskCountWhenNonNull()
+  public void testTaskCountResolutionInConstructor()
   {
-    LagAggregator lagAggregator = mock(LagAggregator.class);
+    // Constructor priority is "explicit taskCount > taskCountStart > 
taskCountMin" so that a
+    // previously autoscaled taskCount survives a Jackson round-trip through 
the metadata store.
 
-    // autoScalerEnabled = true
-    AutoScalerConfig autoScalerConfig = mock(AutoScalerConfig.class);
-    when(autoScalerConfig.getEnableTaskAutoScaler()).thenReturn(true);
-    when(autoScalerConfig.getTaskCountStart()).thenReturn(5);
-    when(autoScalerConfig.getTaskCountMin()).thenReturn(3);
+    // taskCount=10 + taskCountStart=5 -> taskCount wins, isExplicit=true.
+    assertTaskCount(10, autoScaler(5, 3), 10, true);
 
-    SeekableStreamSupervisorIOConfig configAuto = new 
SeekableStreamSupervisorIOConfig(
-        "stream",
-        null,
-        2,
-        10, // (taskCount should be ignored)
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        autoScalerConfig,
-        lagAggregator,
-        null,
-        null,
-        null,
-        null
-    )
-    {
-    };
+    // taskCount=null + taskCountStart=5 -> taskCountStart, isExplicit=false.
+    assertTaskCount(null, autoScaler(5, 3), 5, false);
 
-    Assert.assertEquals(Integer.valueOf(5), configAuto.getTaskCount()); // 
taskCountStart
+    // taskCount=null + no taskCountStart -> taskCountMin, isExplicit=false.
+    assertTaskCount(null, autoScaler(null, 3), 3, false);
 
-    // autoScalerEnabled = false
-    SeekableStreamSupervisorIOConfig configNoAuto = new 
SeekableStreamSupervisorIOConfig(
+    // taskCount=10, no autoscaler -> taskCount, isExplicit=true.
+    assertTaskCount(10, null, 10, true);
+  }
+
+  private static AutoScalerConfig autoScaler(@Nullable Integer taskCountStart, 
int taskCountMin)
+  {
+    final AutoScalerConfig config = mock(AutoScalerConfig.class);
+    when(config.getEnableTaskAutoScaler()).thenReturn(true);
+    when(config.getTaskCountStart()).thenReturn(taskCountStart);
+    when(config.getTaskCountMin()).thenReturn(taskCountMin);
+    return config;
+  }
+
+  private static void assertTaskCount(
+      @Nullable Integer taskCount,
+      @Nullable AutoScalerConfig autoScalerConfig,
+      int expectedTaskCount,
+      boolean expectedExplicit
+  )
+  {
+    final SeekableStreamSupervisorIOConfig config = new 
SeekableStreamSupervisorIOConfig(
         "stream",
         null,
         2,
-        10,
-        null,
+        taskCount,
         null,
         null,
         null,
@@ -142,7 +140,8 @@ public class SeekableStreamSupervisorIOConfigTest
         null,
         null,
         null,
-        lagAggregator,
+        autoScalerConfig,
+        mock(LagAggregator.class),
         null,
         null,
         null,
@@ -150,8 +149,8 @@ public class SeekableStreamSupervisorIOConfigTest
     )
     {
     };
-
-    Assert.assertEquals(Integer.valueOf(10), configNoAuto.getTaskCount());
+    Assert.assertEquals(expectedTaskCount, config.getTaskCount());
+    Assert.assertEquals(expectedExplicit, config.isTaskCountExplicit());
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index bf3d5f9d71e..8d1f755cec2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -52,6 +52,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -1219,7 +1221,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     supervisor.runInternal();
     Thread.sleep(1000); // ensure a dynamic allocation notice completes
 
-    Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount());
     Assert.assertTrue(
         dynamicActionEmitter
             
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -1337,71 +1339,83 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
   }
 
   @Test
-  public void testMergeSpecConfigs()
+  public void testMerge_withExistingSpec()
   {
+    // Resolution rules on user POST when an existing spec is in the DB:
+    //   1. new spec's taskCountStart wins (over both explicit taskCount and 
existing).
+    //   2. else new spec's explicit taskCount wins.
+    //   3. else carry forward existing.taskCount (so autoscaler progress is 
not lost).
     mockIngestionSchema();
 
-    // Given
-    // Create existing spec with autoscaler config and taskCount set to 5
-    HashMap<String, Object> existingAutoScalerConfig = new HashMap<>();
-    existingAutoScalerConfig.put("enableTaskAutoScaler", true);
-    existingAutoScalerConfig.put("taskCountMax", 8);
-    existingAutoScalerConfig.put("taskCountMin", 1);
+    // existing(taskCount=5, autoscaler) + new(no taskCount, no start) -> 
carry forward 5.
+    assertMergeResult(spec(5, 1, 8, null), spec(null, 1, 8, null), 5);
 
-    SeekableStreamSupervisorIOConfig existingIoConfig = 
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
-    EasyMock.expect(existingIoConfig.getAutoScalerConfig())
-            .andReturn(mapper.convertValue(existingAutoScalerConfig, 
AutoScalerConfig.class))
-            .anyTimes();
-    EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes();
-    EasyMock.replay(existingIoConfig);
-
-    SeekableStreamSupervisorIngestionSpec existingIngestionSchema = 
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
-    
EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes();
-    
EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-    EasyMock.expect(existingIngestionSchema.getTuningConfig())
-            .andReturn(seekableStreamSupervisorTuningConfig)
-            .anyTimes();
-    EasyMock.replay(existingIngestionSchema);
+    // existing(5) + new(taskCount=7) -> keep 7.
+    assertMergeResult(spec(5, 1, 8, null), spec(7, 1, 8, null), 7);
 
-    TestSeekableStreamSupervisorSpec existingSpec = 
buildDefaultSupervisorSpecWithIngestionSchema(
-        "id123",
-        existingIngestionSchema
-    );
+    // existing(5) + new(taskCountStart=3) -> 3.
+    assertMergeResult(spec(5, 1, 8, null), spec(null, 1, 8, 3), 3);
 
-    // Create new spec with autoscaler config that has taskCountStart not set 
(null) and no taskCount set
-    HashMap<String, Object> newAutoScalerConfig = new HashMap<>();
-    newAutoScalerConfig.put("enableTaskAutoScaler", true);
-    newAutoScalerConfig.put("taskCountMax", 8);
-    newAutoScalerConfig.put("taskCountMin", 1);
+    // existing(5) + new(taskCount=7, taskCountStart=3) -> 3 (start beats 
explicit).
+    assertMergeResult(spec(5, 1, 8, null), spec(7, 1, 8, 3), 3);
 
-    SeekableStreamSupervisorIOConfig newIoConfig = 
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
-    EasyMock.expect(newIoConfig.getAutoScalerConfig())
-            .andReturn(mapper.convertValue(newAutoScalerConfig, 
AutoScalerConfig.class))
-            .anyTimes();
-    EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes();
-    newIoConfig.setTaskCount(5);
-    EasyMock.expectLastCall().once();
-    EasyMock.replay(newIoConfig);
-
-    SeekableStreamSupervisorIngestionSpec newIngestionSchema = 
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
-    
EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes();
-    
EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-    
EasyMock.expect(newIngestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
-    EasyMock.replay(newIngestionSchema);
-
-    TestSeekableStreamSupervisorSpec newSpec = 
buildDefaultSupervisorSpecWithIngestionSchema(
-        "id124",
-        newIngestionSchema
-    );
+    // No autoscaler on new spec -> merge is a no-op; new spec's taskCount 
stands.
+    assertMergeResult(spec(5, 1, 8, null), buildSpecWithIoConfig("new", 
createIOConfig(7, null)), 7);
+
+    // existing already has an explicit taskCount=6 (e.g. metadata-store 
round-trip). The
+    // *new* spec's isTaskCountExplicit must drive carry-forward, not the 
existing's.
+    assertMergeResult(spec(6, 1, 8, 3), spec(null, 1, 8, 3), 3);
+    assertMergeResult(spec(6, 1, 8, 3), spec(null, 1, 8, null), 6);
+  }
+
+  @Test
+  public void testMerge_withNullExistingSpec_appliesTaskCountStartOnFirstPost()
+  {
+    // First-POST coverage. SupervisorManager calls merge(existingSpec) 
unconditionally,
+    // including when there is no prior spec (existingSpec == null). The 
constructor prefers
+    // an explicit taskCount over taskCountStart, so merge() must re-apply the
+    // "taskCountStart wins on user POST" rule even on the very first 
submission.
+    mockIngestionSchema();
 
-    // Before merge, taskCountStart should be null
-    
Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
+    assertMergeResult(null, spec(7, 1, 8, 3), 3);    // start beats explicit
+    assertMergeResult(null, spec(null, 1, 8, 3), 3); // start applied
+    assertMergeResult(null, spec(7, 1, 8, null), 7); // explicit kept
+    assertMergeResult(null, spec(null, 2, 8, null), 2); // taskCountMin from 
constructor
+  }
 
-    // When - merge should copy taskCount from existing spec since new spec 
has no taskCount
+  private void assertMergeResult(
+      @Nullable TestSeekableStreamSupervisorSpec existingSpec,
+      TestSeekableStreamSupervisorSpec newSpec,
+      int expectedTaskCount
+  )
+  {
     newSpec.merge(existingSpec);
+    Assert.assertEquals(expectedTaskCount, 
newSpec.getIoConfig().getTaskCount());
+  }
+
+  private TestSeekableStreamSupervisorSpec spec(
+      @Nullable Integer taskCount,
+      int taskCountMin,
+      int taskCountMax,
+      @Nullable Integer taskCountStart
+  )
+  {
+    return buildSpecWithIoConfig(
+        "id",
+        createIOConfig(taskCount, lagBasedAutoScalerConfig(taskCountMin, 
taskCountMax, taskCountStart))
+    );
+  }
 
-    // Then - verify setTaskCount was called (EasyMock will verify the mock 
expectations)
-    EasyMock.verify(newIoConfig);
+  private TestSeekableStreamSupervisorSpec buildSpecWithIoConfig(
+      String id,
+      SeekableStreamSupervisorIOConfig ioConfig
+  )
+  {
+    final SeekableStreamSupervisorIngestionSpec ingestionSchema =
+        new SeekableStreamSupervisorIngestionSpec(dataSchema, ioConfig, 
seekableStreamSupervisorTuningConfig)
+        {
+        };
+    return buildDefaultSupervisorSpecWithIngestionSchema(id, ingestionSchema);
   }
 
   private TestSeekableStreamSupervisorSpec 
buildDefaultSupervisorSpecWithIngestionSchema(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index cb2f2b451c1..4266a002b84 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3725,7 +3725,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     // minScaleUpDelay = 0 means any scale-up is immediately allowed.
     supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, 
scalingEmitter);
 
-    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount());
 
     final List<ServiceMetricEvent> events =
         
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
@@ -3744,11 +3744,11 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
     // First scale-up succeeds and stamps the last-scale timestamp.
     supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, 
scalingEmitter);
-    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount());
 
     // Second scale-up is within the 1h minScaleUpDelay window and must be 
blocked.
     supervisor.handleDynamicAllocationTasksNotice(() -> 7, () -> {}, 
scalingEmitter);
-    Assert.assertEquals("Second scale-up must not take effect", 5, 
supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals("Second scale-up must not take effect", 5, 
supervisor.getIoConfig().getTaskCount());
 
     final List<ServiceMetricEvent> events =
         
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
@@ -3771,7 +3771,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     // minScaleDownDelay = 0 means any scale-down is immediately allowed.
     supervisor.handleDynamicAllocationTasksNotice(() -> 2, () -> {}, 
scalingEmitter);
 
-    Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount());
 
     final List<ServiceMetricEvent> events =
         
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
@@ -3790,11 +3790,11 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
     // First scale-down succeeds and stamps the last-scale timestamp.
     supervisor.handleDynamicAllocationTasksNotice(() -> 3, () -> {}, 
scalingEmitter);
-    Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount());
 
     // Second scale-down is within the 1h minScaleDownDelay window and must be 
blocked.
     supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {}, 
scalingEmitter);
-    Assert.assertEquals("Second scale-down must not take effect", 3, 
supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertEquals("Second scale-down must not take effect", 3, 
supervisor.getIoConfig().getTaskCount());
 
     final List<ServiceMetricEvent> events =
         
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
index 35d063e88e1..82abda059a1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
@@ -46,7 +46,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -66,6 +66,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -518,7 +519,10 @@ public abstract class SeekableStreamSupervisorTestBase
                      .build();
   }
 
-  protected SeekableStreamSupervisorIOConfig createIOConfig(int taskCount, 
CostBasedAutoScalerConfig autoScalerConfig)
+  public static SeekableStreamSupervisorIOConfig createIOConfig(
+      Integer taskCount,
+      AutoScalerConfig autoScalerConfig
+  )
   {
     return new SeekableStreamSupervisorIOConfig(
         STREAM,
@@ -542,4 +546,16 @@ public abstract class SeekableStreamSupervisorTestBase
     {
     };
   }
+
+  public static AutoScalerConfig lagBasedAutoScalerConfig(int taskCountMin, 
int taskCountMax, Integer taskCountStart)
+  {
+    final HashMap<String, Object> config = new HashMap<>();
+    config.put("enableTaskAutoScaler", true);
+    config.put("taskCountMin", taskCountMin);
+    config.put("taskCountMax", taskCountMax);
+    if (taskCountStart != null) {
+      config.put("taskCountStart", taskCountStart);
+    }
+    return OBJECT_MAPPER.convertValue(config, AutoScalerConfig.class);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index bf50c7cf48c..67575057ceb 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -27,7 +27,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
 import org.apache.druid.server.security.ResourceAction;
 
 import javax.annotation.Nonnull;
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Set;
 
@@ -130,12 +130,12 @@ public interface SupervisorSpec
   }
 
   /**
-   * Updates this supervisor spec by merging values from the given {@code 
existingSpec}.
+   * Updates this user-submitted supervisor spec by merging values from the 
given {@code existingSpec}.
    * This method may be used to carry forward existing spec values when a 
supervisor is being resubmitted.
    *
    * @param existingSpec used spec to merge values from
    */
-  default void merge(@NotNull SupervisorSpec existingSpec)
+  default void merge(@Nullable SupervisorSpec existingSpec)
   {
     // No-op by default
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to