This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aedee9edf8 Improve lag-based autoscaler config persistence (#18745)
4aedee9edf8 is described below

commit 4aedee9edf8c7a9d16d964a77f55ce44691a7509
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Fri Dec 12 11:03:10 2025 +0200

    Improve lag-based autoscaler config persistence (#18745)
    
    Changes:
    * Add `Supervisor.merge()` to merge task count from existing running 
supervisor
    * Fix up priority of `taskCount` vs `taskCountStart` vs `taskCountMin`
---
 .../overlord/supervisor/SupervisorManager.java     |  28 ++--
 .../supervisor/SeekableStreamSupervisor.java       |  24 ++-
 .../supervisor/SeekableStreamSupervisorSpec.java   |  45 ++++-
 .../supervisor/autoscaler/LagBasedAutoScaler.java  |  28 ++--
 .../autoscaler/LagBasedAutoScalerConfig.java       |   4 +-
 .../supervisor/SupervisorResourceTest.java         | 181 ++++++++++++++++++++-
 .../SeekableStreamSupervisorSpecTest.java          | 142 +++++++++++++---
 .../overlord/supervisor/SupervisorSpec.java        |  12 ++
 8 files changed, 404 insertions(+), 60 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 168b956afd2..21d2a626501 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -174,7 +174,10 @@ public class SupervisorManager
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
       final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
-      possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
+      SupervisorSpec existingSpec = 
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
+      if (existingSpec != null) {
+        spec.merge(existingSpec);
+      }
       createAndStartSupervisorInternal(spec, shouldUpdateSpec);
       return shouldUpdateSpec;
     }
@@ -183,6 +186,7 @@ public class SupervisorManager
   /**
    * Checks whether the submitted SupervisorSpec differs from the current spec 
in SupervisorManager's supervisor list.
    * This is used in SupervisorResource specPost to determine whether the 
Supervisor needs to be restarted
+   *
    * @param spec The spec submitted
    * @return boolean - true only if the spec has been modified, false otherwise
    */
@@ -221,7 +225,7 @@ public class SupervisorManager
 
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
-      return possiblyStopAndRemoveSupervisorInternal(id, true);
+      return possiblyStopAndRemoveSupervisorInternal(id, true) != null;
     }
   }
 
@@ -299,7 +303,8 @@ public class SupervisorManager
     log.info("SupervisorManager stopped.");
   }
 
-  public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id, 
@Nullable Integer limit) throws IllegalArgumentException
+  public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id, 
@Nullable Integer limit)
+      throws IllegalArgumentException
   {
     return metadataSupervisorManager.getAllForId(id, limit);
   }
@@ -429,13 +434,14 @@ public class SupervisorManager
    * Caller should have acquired [lock] before invoking this method to avoid 
contention with other threads that may be
    * starting, stopping, suspending and resuming supervisors.
    *
-   * @return true if a supervisor was stopped, false if there was no 
supervisor with this id
+   * @return reference to existing supervisor, if exists and was stopped, null 
if there was no supervisor with this id
    */
-  private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean 
writeTombstone)
+  @Nullable
+  private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, 
boolean writeTombstone)
   {
     Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
-    if (pair == null) {
-      return false;
+    if (pair == null || pair.rhs == null || pair.lhs == null) {
+      return null;
     }
 
     if (writeTombstone) {
@@ -447,13 +453,13 @@ public class SupervisorManager
     pair.lhs.stop(true);
     supervisors.remove(id);
 
-    SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
-    if (autoscler != null) {
-      autoscler.stop();
+    SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+    if (autoscaler != null) {
+      autoscaler.stop();
       autoscalers.remove(id);
     }
 
-    return true;
+    return pair.rhs;
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0f2839e7fba..f69dc52159b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -135,8 +135,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- * this class is the parent class of both the Kafka and Kinesis supervisor. 
All the main run loop
- * logic are similar enough so they're grouped together into this class.
+ * This class is the parent class of both the Kafka and Kinesis supervisor. 
All the main run loop
+ * logic is similar enough, so they're grouped together into this class.
  * <p>
  * Supervisor responsible for managing the SeekableStreamIndexTasks 
(Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a
  * {@link SeekableStreamSupervisorSpec} which includes the stream name (topic 
/ stream) and configuration as well as an ingestion spec which will
@@ -541,10 +541,20 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * This method determines how to do scale actions based on collected lag 
points.
-   * If scale action is triggered :
-   * First of all, call gracefulShutdownInternal() which will change the state 
of current datasource ingest tasks from reading to publishing.
-   * Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
-   * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
+   * If scale action is triggered:
+   * <ul>
+   * <li>First, call <code>gracefulShutdownInternal()</code> which will change 
the state of current datasource ingest tasks from reading to publishing.
+   * <li>Secondly, clear all the stateful data structures:
+   * <ul>
+   *  <li><code>activelyReadingTaskGroups</code>,
+   *  <li><code>partitionGroups</code>,
+   *  <li><code>partitionOffsets</code>,
+   *  <li><code>pendingCompletionTaskGroups</code>,
+   *  <li><code>partitionIds</code>.
+   * </ul>
+   * These structures will be rebuiled in the next 'RunNotice'.
+   * <li>Finally, change the <code>taskCount</code> in 
<code>SeekableStreamSupervisorIOConfig</code> and sync it to 
<code>MetadataStorage</code>.
+   * </ul>
    * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
    *
    * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
@@ -916,7 +926,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
-  // snapshots latest sequences from stream to be verified in next run cycle 
of inactive stream check
+  // snapshots latest sequences from the stream to be verified in the next run 
cycle of inactive stream check
   private final Map<PartitionIdType, SequenceOffsetType> 
previousSequencesFromStream = new HashMap<>();
   private long lastActiveTimeMillis;
   private final IdleConfig idleConfig;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 967652673f7..f21e073f6c4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -43,16 +43,18 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 
 import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 import java.util.List;
 import java.util.Map;
 
 public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
 {
-  protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = 
"Update of the input source stream from [%s] to [%s] is not supported for a 
running supervisor."
-                                                                   + "%nTo 
perform the update safely, follow these steps:"
-                                                                   + "%n(1) 
Suspend this supervisor, reset its offsets and then terminate it. "
-                                                                   + "%n(2) 
Create a new supervisor with the new input source stream."
-                                                                   + "%nNote 
that doing the reset can cause data duplication or loss if any topic used in 
the old supervisor is included in the new one too.";
+  protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE =
+      "Update of the input source stream from [%s] to [%s] is not supported 
for a running supervisor."
+      + "%nTo perform the update safely, follow these steps:"
+      + "%n(1) Suspend this supervisor, reset its offsets and then terminate 
it. "
+      + "%n(2) Create a new supervisor with the new input source stream."
+      + "%nNote that doing the reset can cause data duplication or loss if any 
topic used in the old supervisor is included in the new one too.";
 
   private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
       SeekableStreamSupervisorIngestionSpec ingestionSchema
@@ -183,6 +185,7 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
 
   /**
    * An autoScaler instance will be returned depending on the 
autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled 
then NoopTaskAutoScaler will be returned.
+   *
    * @param supervisor
    * @return autoScaler
    */
@@ -232,6 +235,7 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
    * <li>You cannot migrate between types of supervisors.</li>
    * <li>You cannot change the input source stream of a running 
supervisor.</li>
    * </ul>
+   *
    * @param proposedSpec the proposed supervisor spec
    * @throws DruidException if the proposed spec update is not allowed
    */
@@ -240,7 +244,9 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
   {
     if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
       throw InvalidInput.exception(
-          "Cannot update supervisor spec from type[%s] to type[%s]", 
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
+          "Cannot update supervisor spec from type[%s] to type[%s]",
+          getClass().getSimpleName(),
+          proposedSpec.getClass().getSimpleName()
       );
     }
     SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) 
proposedSpec;
@@ -255,6 +261,33 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
     }
   }
 
+  @Override
+  public void merge(@NotNull SupervisorSpec existingSpec)
+  {
+    AutoScalerConfig thisAutoScalerConfig = 
this.getIoConfig().getAutoScalerConfig();
+    // Either if autoscaler is absent or taskCountStart is specified - just 
return.
+    if (thisAutoScalerConfig == null || 
thisAutoScalerConfig.getTaskCountStart() != null) {
+      return;
+    }
+
+    // Use a switch expression with pattern matching when we move to Java 21 
as a minimum requirement.
+    if (existingSpec instanceof SeekableStreamSupervisorSpec) {
+      SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) 
existingSpec;
+      AutoScalerConfig autoScalerConfig = 
spec.getIoConfig().getAutoScalerConfig();
+      if (autoScalerConfig == null) {
+        return;
+      }
+      // provided `taskCountStart` > provided `taskCount` > existing 
`taskCount` > provided `taskCountMin`.
+      int taskCount = thisAutoScalerConfig.getTaskCountMin();
+      if (this.getIoConfig().getTaskCount() != null) {
+        taskCount = this.getIoConfig().getTaskCount();
+      } else if (spec.getIoConfig().getTaskCount() != null) {
+        taskCount = spec.getIoConfig().getTaskCount();
+      }
+      this.getIoConfig().setTaskCount(taskCount);
+    }
+  }
+
   protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean 
suspend);
 
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index b1446c3d4c1..142193ae63f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -124,8 +124,10 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     );
     log.info(
         "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
up to [%d] data points for the last [%d] millis for dataSource [%s]",
-        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.maxSize(),
-        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        lagMetricsQueue.maxSize(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
+        dataSource
     );
   }
 
@@ -192,19 +194,25 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
   /**
    * This method determines whether to do scale actions based on collected lag 
points.
-   * Current algorithm of scale is simple:
-   * First of all, compute the proportion of lag points higher/lower than 
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
-   * Secondly, compare scaleOutThreshold/scaleInThreshold with 
triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale 
out action has higher priority than scale in action.
-   * Finaly, if scaleOutThreshold/scaleInThreshold is higher than 
triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in 
action would be triggered.
+   * The current algorithm of scale is straightforward:
+   * <ul>
+   * <li>First, compute the proportion of lag points higher/lower than {@code 
scaleOutThreshold/scaleInThreshold},
+   * getting {@code scaleInThreshold/scaleOutThreshold},.
+   * <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
+   * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
+   * <ul><li>P.S. Scale out action has a higher priority than scale in 
action.</ul>
+   * <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
+   * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, 
scale out/in action would be triggered.
+   * </ul>
    *
-   * @param lags the lag metrics of Stream(Kafka/Kinesis)
-   * @return Integer. target number of tasksCount, -1 means skip scale action.
+   * @param lags the lag metrics of Stream (Kafka/Kinesis)
+   * @return Integer, target number of tasksCount. -1 means skip scale action.
    */
   private int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
+    // if the supervisor is not suspended, ensure required tasks are running
     // if suspended, ensure tasks have been requested to gracefully stop
-    log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    log.debug("Computing the desired task count for [%s], based on following 
lags : [%s]", dataSource, lags);
     int beyond = 0;
     int within = 0;
     int metricsCount = lags.size();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index b4a9b0e8891..ad036dd0e10 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -103,8 +103,8 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
 
     this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
     this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
-    this.minTriggerScaleActionFrequencyMillis = 
minTriggerScaleActionFrequencyMillis
-        != null ? minTriggerScaleActionFrequencyMillis : 600000;
+    this.minTriggerScaleActionFrequencyMillis =
+        minTriggerScaleActionFrequencyMillis != null ? 
minTriggerScaleActionFrequencyMillis : 600000;
 
     Preconditions.checkArgument(
         stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && 
stopTaskCountRatio <= 1.0),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 538fd4a9e78..0737722d341 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -27,12 +27,23 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.audit.AuditManager;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
@@ -54,6 +65,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.util.Collections;
@@ -61,6 +73,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 @RunWith(EasyMockRunner.class)
@@ -1083,7 +1096,9 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1", 
null)).andReturn(versions1).times(1);
     EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2", 
null)).andReturn(versions2).times(1);
     EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", 
null)).andReturn(versions3).times(1);
-    EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", 
null)).andReturn(Collections.emptyList()).times(1);
+    EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", null))
+            .andReturn(Collections.emptyList())
+            .times(1);
     setupMockRequestForUser("notdruid");
     replayAll();
 
@@ -1183,12 +1198,18 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     // Test with limit=0 (should return 400 Bad Request)
     response = supervisorResource.specGetHistory(request, "id1", 0);
     Assert.assertEquals(400, response.getStatus());
-    Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than 
zero if set (count was 0)"), response.getEntity());
+    Assert.assertEquals(
+        ImmutableMap.of("error", "Count must be greater than zero if set 
(count was 0)"),
+        response.getEntity()
+    );
 
     // Test with negative limit (should return 400 Bad Request)
     response = supervisorResource.specGetHistory(request, "id1", -1);
     Assert.assertEquals(400, response.getStatus());
-    Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than 
zero if set (count was -1)"), response.getEntity());
+    Assert.assertEquals(
+        ImmutableMap.of("error", "Count must be greater than zero if set 
(count was -1)"),
+        response.getEntity()
+    );
 
     // Test with limit larger than available history
     response = supervisorResource.specGetHistory(request, "id1", 100);
@@ -1320,6 +1341,99 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     Assert.assertEquals(spec, specRoundTrip);
   }
 
+  @Test
+  public void 
testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed()
+  {
+    // New spec has no taskCount -> should use existing taskCount (5)
+    TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+    TestSeekableStreamSupervisorSpec newSpec = 
createTestSpecWithExpectedMerge(null, 2, 5);
+
+    newSpec.merge(existingSpec);
+    EasyMock.verify(newSpec.getIoConfig());
+  }
+
+  @Test
+  public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount()
+  {
+    // New spec has taskCount=3 -> should use provided taskCount over existing 
(5)
+    TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+    TestSeekableStreamSupervisorSpec newSpec = 
createTestSpecWithExpectedMerge(3, 2, 3);
+
+    newSpec.merge(existingSpec);
+    EasyMock.verify(newSpec.getIoConfig());
+  }
+
+  @Test
+  public void testSpecPostMergeFallsBackToProvidedTaskCountMin()
+  {
+    // Neither has taskCount -> should fall back to taskCountMin (4)
+    TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 1);
+    TestSeekableStreamSupervisorSpec newSpec = 
createTestSpecWithExpectedMerge(null, 4, 4);
+
+    newSpec.merge(existingSpec);
+    EasyMock.verify(newSpec.getIoConfig());
+  }
+
+  private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount, 
int taskCountMin)
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("taskCountMax", 10);
+    autoScalerConfig.put("taskCountMin", taskCountMin);
+
+    SeekableStreamSupervisorIOConfig ioConfig = 
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
+    EasyMock.expect(ioConfig.getAutoScalerConfig())
+            .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
+    EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
+    EasyMock.replay(ioConfig);
+
+    DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
+    
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
+    EasyMock.replay(dataSchema);
+
+    SeekableStreamSupervisorIngestionSpec ingestionSchema =
+        EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
+  }
+
+  private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge(
+      Integer taskCount,
+      int taskCountMin,
+      int expectedTaskCount
+  )
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("taskCountMax", 10);
+    autoScalerConfig.put("taskCountMin", taskCountMin);
+
+    SeekableStreamSupervisorIOConfig ioConfig = 
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
+    EasyMock.expect(ioConfig.getAutoScalerConfig())
+            .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
+    EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
+    ioConfig.setTaskCount(expectedTaskCount);
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(ioConfig);
+
+    DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
+    
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
+    EasyMock.replay(dataSchema);
+
+    SeekableStreamSupervisorIngestionSpec ingestionSchema =
+        EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
+  }
+
   private void setupMockRequest()
   {
     setupMockRequestForUser("druid");
@@ -1445,10 +1559,10 @@ public class SupervisorResourceTest extends 
EasyMockSupport
       if (getId() != null ? !getId().equals(that.getId()) : that.getId() != 
null) {
         return false;
       }
-      if (supervisor != null ? !supervisor.equals(that.supervisor) : 
that.supervisor != null) {
+      if (!Objects.equals(supervisor, that.supervisor)) {
         return false;
       }
-      if (datasources != null ? !datasources.equals(that.datasources) : 
that.datasources != null) {
+      if (!Objects.equals(datasources, that.datasources)) {
         return false;
       }
       return isSuspended() == that.isSuspended();
@@ -1464,4 +1578,61 @@ public class SupervisorResourceTest extends 
EasyMockSupport
       return result;
     }
   }
+
+  static class TestSeekableStreamSupervisorSpec extends 
SeekableStreamSupervisorSpec
+  {
+    public TestSeekableStreamSupervisorSpec(
+        @Nullable String id,
+        SeekableStreamSupervisorIngestionSpec ingestionSchema
+    )
+    {
+      super(
+          id,
+          ingestionSchema,
+          null,
+          false,
+          EasyMock.createMock(TaskStorage.class),
+          EasyMock.createMock(TaskMaster.class),
+          EasyMock.createMock(IndexerMetadataStorageCoordinator.class),
+          EasyMock.createMock(SeekableStreamIndexTaskClientFactory.class),
+          OBJECT_MAPPER,
+          EasyMock.createMock(ServiceEmitter.class),
+          EasyMock.createMock(DruidMonitorSchedulerConfig.class),
+          EasyMock.createMock(RowIngestionMetersFactory.class),
+          EasyMock.createMock(SupervisorStateManagerConfig.class)
+      );
+    }
+
+    @Override
+    public Supervisor createSupervisor()
+    {
+      return null;
+    }
+
+    @Override
+    public String getType()
+    {
+      return "test";
+    }
+
+    @Override
+    public String getSource()
+    {
+      return "test-stream";
+    }
+
+    @Override
+    protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+    {
+      return null;
+    }
+
+    @JsonIgnore
+    @Nonnull
+    @Override
+    public Set<ResourceAction> getInputSourceResources() throws 
UnsupportedOperationException
+    {
+      return Collections.singleton(new ResourceAction(new Resource("test", 
ResourceType.EXTERNAL), Action.READ));
+    }
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index e96e3887318..58ffb8438e0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -566,14 +566,16 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     Exception e = null;
     try {
-      AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of(
-          "enableTaskAutoScaler",
-          "true",
-          "taskCountMax",
-          "1",
-          "taskCountMin",
-          "4"
-      ), AutoScalerConfig.class);
+      AutoScalerConfig autoScalerError = mapper.convertValue(
+          ImmutableMap.of(
+              "enableTaskAutoScaler",
+              "true",
+              "taskCountMax",
+              "1",
+              "taskCountMin",
+              "4"
+          ), AutoScalerConfig.class
+      );
     }
     catch (RuntimeException ex) {
       e = ex;
@@ -685,16 +687,18 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     EasyMock.replay(ingestionSchema);
 
     EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
-            .andReturn(mapper.convertValue(ImmutableMap.of(
-                "lagCollectionIntervalMillis",
-                "1",
-                "enableTaskAutoScaler",
-                true,
-                "taskCountMax",
-                "4",
-                "taskCountMin",
-                "1"
-            ), AutoScalerConfig.class))
+            .andReturn(mapper.convertValue(
+                ImmutableMap.of(
+                    "lagCollectionIntervalMillis",
+                    "1",
+                    "enableTaskAutoScaler",
+                    true,
+                    "taskCountMax",
+                    "4",
+                    "taskCountMin",
+                    "1"
+                ), AutoScalerConfig.class
+            ))
             .anyTimes();
     
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
@@ -775,6 +779,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
+
     int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountBeforeScaleOut);
     Thread.sleep(1000);
@@ -1003,9 +1008,11 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
+
     int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountBeforeScaleOut);
     Thread.sleep(1000);
+
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScaleOut);
     
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
@@ -1050,11 +1057,14 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     // enable autoscaler so that taskcount config will be ignored and init 
value of taskCount will use taskCountMin.
     Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
     supervisor.getIoConfig().setTaskCount(2);
+
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
+
     int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountBeforeScaleOut);
+
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountAfterScaleOut);
@@ -1102,15 +1112,18 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
         emitter
     );
 
-    // enable autoscaler so that taskcount config will be ignored and init 
value of taskCount will use taskCountMin.
+    // enable autoscaler so that taskcount config will be ignored and the init 
value of taskCount will use taskCountMin.
     Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
     supervisor.getIoConfig().setTaskCount(2);
+
+    // When
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
 
     Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
     Thread.sleep(2000);
+    // Then
     Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
 
     
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
@@ -1580,6 +1593,97 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
   }
 
+  @Test
+  public void testMergeSpecConfigs()
+  {
+    mockIngestionSchema();
+
+    // Given
+    // Create existing spec with autoscaler config and taskCount set to 5
+    HashMap<String, Object> existingAutoScalerConfig = new HashMap<>();
+    existingAutoScalerConfig.put("enableTaskAutoScaler", true);
+    existingAutoScalerConfig.put("taskCountMax", 8);
+    existingAutoScalerConfig.put("taskCountMin", 1);
+
+    SeekableStreamSupervisorIOConfig existingIoConfig = 
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
+    EasyMock.expect(existingIoConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(existingAutoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
+    EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes();
+    EasyMock.replay(existingIoConfig);
+
+    SeekableStreamSupervisorIngestionSpec existingIngestionSchema = 
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
+    
EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes();
+    
EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(existingIngestionSchema.getTuningConfig())
+            .andReturn(seekableStreamSupervisorTuningConfig)
+            .anyTimes();
+    EasyMock.replay(existingIngestionSchema);
+
+    TestSeekableStreamSupervisorSpec existingSpec = 
buildDefaultSupervisorSpecWithIngestionSchema(
+        "id123",
+        existingIngestionSchema
+    );
+
+    // Create new spec with autoscaler config that has taskCountStart not set 
(null) and no taskCount set
+    HashMap<String, Object> newAutoScalerConfig = new HashMap<>();
+    newAutoScalerConfig.put("enableTaskAutoScaler", true);
+    newAutoScalerConfig.put("taskCountMax", 8);
+    newAutoScalerConfig.put("taskCountMin", 1);
+
+    SeekableStreamSupervisorIOConfig newIoConfig = 
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
+    EasyMock.expect(newIoConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(newAutoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
+    EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes();
+    newIoConfig.setTaskCount(5);
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(newIoConfig);
+
+    SeekableStreamSupervisorIngestionSpec newIngestionSchema = 
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
+    
EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes();
+    
EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(newIngestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(newIngestionSchema);
+
+    TestSeekableStreamSupervisorSpec newSpec = 
buildDefaultSupervisorSpecWithIngestionSchema(
+        "id124",
+        newIngestionSchema
+    );
+
+    // Before merge, taskCountStart should be null
+    
Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
+
+    // When - merge should copy taskCount from existing spec since new spec 
has no taskCount
+    newSpec.merge(existingSpec);
+
+    // Then - verify setTaskCount was called (EasyMock will verify the mock 
expectations)
+    EasyMock.verify(newIoConfig);
+  }
+
+  private TestSeekableStreamSupervisorSpec 
buildDefaultSupervisorSpecWithIngestionSchema(
+      String id,
+      SeekableStreamSupervisorIngestionSpec ingestionSchema
+  )
+  {
+    return new TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        id
+    );
+  }
+
   private void mockIngestionSchema()
   {
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 9ff217d5404..377223308b0 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -27,6 +27,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
 import org.apache.druid.server.security.ResourceAction;
 
 import javax.annotation.Nonnull;
+import javax.validation.constraints.NotNull;
 import java.util.List;
 import java.util.Set;
 
@@ -113,4 +114,15 @@ public interface SupervisorSpec
   {
     // The default implementation does not do any validation checks.
   }
+
+  /**
+   * Updates this supervisor spec by merging values from the given {@code 
existingSpec}.
+   * This method may be used to carry forward existing spec values when a 
supervisor is being resubmitted.
+   *
+   * @param existingSpec used spec to merge values from
+   */
+  default void merge(@NotNull SupervisorSpec existingSpec)
+  {
+    // No-op by default
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to