kfaraz commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2378645919


##########
indexing-service/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java:
##########
@@ -556,4 +556,30 @@ private void verifyTaskStatus(TaskStatus expected, 
TaskStatus actual)
   {
     Assert.assertEquals(expected, actual);
   }
+
+  @Test
+  public void testUpdateTask()

Review Comment:
   I don't think this is needed anymore.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumberTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.constraints.NotNull;
+
+public class OrderedSequenceNumberTest
+{
+  @Test
+  public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_lessThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertTrue("Should have more to read when current < end with 
exclusive end",
+                     current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_equalTo()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(10L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertFalse("Should NOT have more to read when current == end with 
exclusive end",
+                      current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_exclusiveEnd_greaterThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(15L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertFalse("Should NOT have more to read when current > end with 
exclusive end",
+                      current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_lessThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertTrue("Should have more to read when current < end with 
inclusive end",
+                     current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_equalTo()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(10L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertTrue("Should have more to read when current == end with 
inclusive end",
+                     current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_greaterThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(15L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertFalse("Should NOT have more to read when current > end with 
inclusive end",
+                      current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void 
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_exclusiveEnd()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+    Assert.assertFalse("Should return false when end sequence number is null",
+                      current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void 
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_inclusiveEnd()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+    Assert.assertFalse("Should return false when end sequence number is null",
+                      current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void 
testIsMoreToReadBeforeReadingRecord_nullCurrentSequenceNumber_exclusiveEnd()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(null, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertThrows(NullPointerException.class, () -> 
current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void 
testIsMoreToReadBeforeReadingRecord_nullCurrentSequenceNumber_inclusiveEnd()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(null, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertThrows(NullPointerException.class, () -> 
current.isMoreToReadBeforeReadingRecord(end, false));

Review Comment:
   Please put args in separate lines.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -117,6 +122,8 @@ public SeekableStreamSupervisorSpec(
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
     this.suspended = suspended != null ? suspended : false;
     this.supervisorStateManagerConfig = supervisorStateManagerConfig;
+    this.usePersistentTasks = Configs.valueOrDefault(usePersistentTasks, 
false);
+    this.version = DateTimes.nowUtc().toString();

Review Comment:
   We should not create a new version here and use the persisted version 
instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -212,6 +218,21 @@ public class TaskGroup
 
     boolean handoffEarly = false; // set by 
SupervisorManager.stopTaskGroupEarly
 
+    public int getId()
+    {
+      return groupId;
+    }
+
+    public DateTime getMinimumMessageTime()
+    {
+      return minimumMessageTime;
+    }
+
+    public DateTime getMaximumMessageTime()

Review Comment:
   Where are these getters used?



##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java:
##########
@@ -121,9 +121,9 @@ protected RecordSupplier<String, Long, ByteEntity> 
setupRecordSupplier()
     RabbitStreamIndexTaskTuningConfig taskTuningConfig = 
spec.getTuningConfig();
 
     return new RabbitStreamRecordSupplier(
-      spec.getIoConfig().getConsumerProperties(),
+      spec.getSpec().getIOConfig().getConsumerProperties(),

Review Comment:
   Gentle reminder: please revert these refactors from this PR.
   We can do these in a separate follow up PR.



##########
indexing-service/src/test/java/org/apache/druid/metadata/MetadataStorageActionHandlerTest.java:
##########
@@ -148,6 +148,12 @@ public void populateTaskTypeAndGroupIdAsync()
       {
 
       }
+
+      @Override
+      public void update(String id, Task entry)

Review Comment:
   Please remove this.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -113,4 +114,9 @@ default void validateSpecUpdateTo(SupervisorSpec 
proposedSpec) throws DruidExcep
   {
     // The default implementation does not do any validation checks.
   }
+
+  default Optional<String> getVersion()

Review Comment:
   We shouldn't put version into the `SupervisorSpec` since there is already a 
`VersionedSupervisorSpec`.
   You can maintain the version as a separate variable in the supervisor or 
just use a `VersionedSupervisorSpec`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -251,6 +256,8 @@ public enum Status
   private volatile DateTime minMessageTime;
   private volatile DateTime maxMessageTime;
   private final ScheduledExecutorService rejectionPeriodUpdaterExec;
+  private final AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);
+

Review Comment:
   Nit: remove the extra newline.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -78,6 +80,8 @@ private static SeekableStreamSupervisorIngestionSpec 
checkIngestionSchema(
   protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
   private final boolean suspended;
   protected final SupervisorStateManagerConfig supervisorStateManagerConfig;
+  protected final boolean usePersistentTasks;
+  protected String version;

Review Comment:
   Version should not be present here.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -744,6 +744,12 @@ private TestTask(String id, Interval interval, Map<String, 
Object> context)
       this.interval = interval;
     }
 
+    private TestTask(String id, String dataSource, Interval interval, 
Map<String, Object> context)

Review Comment:
   Is this needed anymore?



##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java:
##########
@@ -82,7 +82,8 @@ public RabbitStreamSupervisorSpec(
         emitter,
         monitorSchedulerConfig,
         rowIngestionMetersFactory,
-        supervisorStateManagerConfig);
+        supervisorStateManagerConfig,
+        null);

Review Comment:
   ```suggestion
           null
       );
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumberTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.constraints.NotNull;
+
+public class OrderedSequenceNumberTest
+{
+  @Test
+  public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_lessThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertTrue("Should have more to read when current < end with 
exclusive end",
+                     current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_equalTo()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(10L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertFalse("Should NOT have more to read when current == end with 
exclusive end",
+                      current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_exclusiveEnd_greaterThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(15L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertFalse("Should NOT have more to read when current > end with 
exclusive end",
+                      current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_lessThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertTrue("Should have more to read when current < end with 
inclusive end",
+                     current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_equalTo()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(10L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertTrue("Should have more to read when current == end with 
inclusive end",
+                     current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_greaterThan()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(15L, false);
+    TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+    Assert.assertFalse("Should NOT have more to read when current > end with 
inclusive end",
+                      current.isMoreToReadBeforeReadingRecord(end, false));
+  }
+
+  @Test
+  public void 
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_exclusiveEnd()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+    Assert.assertFalse("Should return false when end sequence number is null",
+                      current.isMoreToReadBeforeReadingRecord(end, true));
+  }
+
+  @Test
+  public void 
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_inclusiveEnd()
+  {
+    TestSequenceNumber current = new TestSequenceNumber(5L, false);
+    TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+    Assert.assertFalse("Should return false when end sequence number is null",
+                      current.isMoreToReadBeforeReadingRecord(end, false));

Review Comment:
   Please use this formatting style in all the assertions in this class.
   
   ```suggestion
       Assert.assertFalse(
           "Should return false when end sequence number is null",
           current.isMoreToReadBeforeReadingRecord(end, false)
       );
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -226,6 +226,70 @@ protected SeekableStreamIndexTaskIOConfig 
createTaskIoConfig(
     );
   }
 
+  @Override
+  protected SeekableStreamIndexTaskIOConfig<KafkaTopicPartition, Long> 
createUpdatedTaskIoConfig(
+      Set<KafkaTopicPartition> partitions,
+      TaskGroup existingTaskGroup,
+      Map<KafkaTopicPartition, Long> latestCommittedOffsets,
+      Map<KafkaTopicPartition, Long> latestTaskOffsetsOnPause
+  )
+  {
+    log.info("Creating updated task IO config for task group [%s]", 
existingTaskGroup.getId());

Review Comment:
   This should be removed as it can get pretty noisy.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -85,7 +87,9 @@ public SeekableStreamIndexTask(
       final SeekableStreamIndexTaskTuningConfig tuningConfig,
       final SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig,
       @Nullable final Map<String, Object> context,
-      @Nullable final String groupId
+      @Nullable final String groupId,
+      @Nullable final Boolean isPerpetuallyRunning,

Review Comment:
   For tasks, use the field name `isPersistent`.
   For supervisor and supervisor spec, use the field name `usePersistentTasks`.
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,10 +1739,146 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/config")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(

Review Comment:
   Since all other methods seem to follow this convention.
   ```suggestion
     public Response updateConfigHTTP(
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -106,6 +110,8 @@ public SeekableStreamIndexTask(
                                 : LockGranularity.SEGMENT;
     this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
     this.supervisorId = 
Preconditions.checkNotNull(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), "supervisorId");
+    this.isPerpetuallyRunning = Configs.valueOrDefault(isPerpetuallyRunning, 
false);
+    this.supervisorSpecVersion = Configs.valueOrDefault(supervisorSpecVersion, 
"");

Review Comment:
   Assigning to empty serves no purpose, let it be null if not specified.
   ```suggestion
       this.supervisorSpecVersion = supervisorSpecVersion;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -195,14 +198,17 @@ public class TaskGroup
     // this task group has completed successfully, at which point this will be 
destroyed and a new task group will be
     // created with new starting sequences. This allows us to create 
replacement tasks for failed tasks that process the
     // same sequences, even if the values in [partitionGroups] has been 
changed.
-    final ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
+    // In perpetually-running tasks mode, the actively running task groups 
will be replaced with new task groups with updated starting sequences.
+    ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
 
     // We don't include closed partitions in the starting offsets. However, we 
keep the full unfiltered map of
     // partitions, only used for generating the sequence name, to avoid 
ambiguity in sequence names if mulitple
     // task groups have nothing but closed partitions in their assignments.
+

Review Comment:
   extra newline



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -195,14 +198,17 @@ public class TaskGroup
     // this task group has completed successfully, at which point this will be 
destroyed and a new task group will be
     // created with new starting sequences. This allows us to create 
replacement tasks for failed tasks that process the
     // same sequences, even if the values in [partitionGroups] has been 
changed.
-    final ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
+    // In perpetually-running tasks mode, the actively running task groups 
will be replaced with new task groups with updated starting sequences.

Review Comment:
   ```suggestion
       // With persistent tasks, the actively running task groups will be 
replaced with new task groups with updated starting sequences.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -153,6 +159,18 @@ public SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> getI
     return ioConfig;
   }
 
+  @JsonProperty("isPerpetuallyRunning")

Review Comment:
   ```suggestion
     @JsonProperty
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -391,7 +394,10 @@ private KafkaSupervisorSpec createKafkaSupervisor(
     return MoreResources.Supervisor.KAFKA_JSON
         .get()
         .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null)))
-        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(maxRowsPerSegment))
+        .withTuningConfig(tuningConfig -> tuningConfig

Review Comment:
   Please revert all the changes to this file.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -106,6 +110,8 @@ public SeekableStreamIndexTask(
                                 : LockGranularity.SEGMENT;
     this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
     this.supervisorId = 
Preconditions.checkNotNull(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), "supervisorId");
+    this.isPerpetuallyRunning = Configs.valueOrDefault(isPerpetuallyRunning, 
false);

Review Comment:
   The `false` here should probably be a public constant in 
`SeekableStreamSupervisor` so that both the supervisor spec and the task can 
use this default.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -153,6 +159,18 @@ public SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> getI
     return ioConfig;
   }
 
+  @JsonProperty("isPerpetuallyRunning")
+  public boolean isPerpetuallyRunning()
+  {
+    return isPerpetuallyRunning;
+  }
+
+  @JsonProperty("supervisorSpecVersion")

Review Comment:
   ```suggestion
     @JsonProperty
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTaskScalingTest.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import 
org.apache.druid.indexing.kafka.supervisor.LagBasedAutoScalerConfigBuilder;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Embedded test to verify task scaling behaviour of {@code KafkaSupervisor} 
ingesting from a custom kafka topic.
+ */
+@SuppressWarnings("resource")
+public class KafkaTaskScalingTest extends EmbeddedClusterTestBase

Review Comment:
   Why is this class separate from `KafkaTaskAutoScalingTest`. We can merge the 
two classes into one.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -571,29 +599,306 @@ private boolean changeTaskCount(int 
desiredActiveTaskCount)
           dataSource
       );
       final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
-      gracefulShutdownInternal();
-      changeTaskCountInIOConfig(desiredActiveTaskCount);
-      clearAllocationInfo();
-      emitter.emit(ServiceMetricEvent.builder()
-                                     .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
-                                     .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
-                                     .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
-                                     .setDimensionIfNotNull(
-                                         DruidMetrics.TAGS,
-                                         
spec.getContextValue(DruidMetrics.TAGS)
-                                     )
-                                     .setMetric(
-                                         AUTOSCALER_SCALING_TIME_METRIC,
-                                         scaleActionStopwatch.millisElapsed()
-                                     ));
+      
+      if (spec.usePersistentTasks()) {
+        return changeTaskCountForPerpetualTasks(desiredActiveTaskCount, 
successfulScaleAutoScalerCallback);
+      } else {
+        gracefulShutdownInternal();
+        changeTaskCountInIOConfig(desiredActiveTaskCount);
+        clearAllocationInfo();
+      }
+      emitAutoScalerRunMetric(scaleActionStopwatch);
       log.info("Changed taskCount to [%s] for supervisor[%s] for 
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
       return true;
     }
   }
 
+  private void emitAutoScalerRunMetric(Stopwatch scaleActionStopwatch)
+  {
+    emitter.emit(ServiceMetricEvent.builder()
+                                   .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
+                                   .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                                   .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                                   .setDimensionIfNotNull(
+                                       DruidMetrics.TAGS,
+                                       spec.getContextValue(DruidMetrics.TAGS)
+                                   )
+                                   .setMetric(
+                                       AUTOSCALER_SCALING_TIME_METRIC,
+                                       scaleActionStopwatch.millisElapsed()
+                                   ));
+  }
+
+  /**
+   * Handles task count changes for perpetual tasks using updateConfig instead 
of graceful shutdown.
+   * This approach pauses tasks, recalculates partition assignments, and sends 
config updates.
+   */
+  private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount,
+                                                   Runnable 
successfulScaleAutoScalerCallback
+  )

Review Comment:
   Please fix the formatting.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -571,29 +599,306 @@ private boolean changeTaskCount(int 
desiredActiveTaskCount)
           dataSource
       );
       final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
-      gracefulShutdownInternal();
-      changeTaskCountInIOConfig(desiredActiveTaskCount);
-      clearAllocationInfo();
-      emitter.emit(ServiceMetricEvent.builder()
-                                     .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
-                                     .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
-                                     .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
-                                     .setDimensionIfNotNull(
-                                         DruidMetrics.TAGS,
-                                         
spec.getContextValue(DruidMetrics.TAGS)
-                                     )
-                                     .setMetric(
-                                         AUTOSCALER_SCALING_TIME_METRIC,
-                                         scaleActionStopwatch.millisElapsed()
-                                     ));
+      
+      if (spec.usePersistentTasks()) {
+        return changeTaskCountForPerpetualTasks(desiredActiveTaskCount, 
successfulScaleAutoScalerCallback);
+      } else {
+        gracefulShutdownInternal();
+        changeTaskCountInIOConfig(desiredActiveTaskCount);
+        clearAllocationInfo();
+      }
+      emitAutoScalerRunMetric(scaleActionStopwatch);
       log.info("Changed taskCount to [%s] for supervisor[%s] for 
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
       return true;
     }
   }
 
+  private void emitAutoScalerRunMetric(Stopwatch scaleActionStopwatch)
+  {
+    emitter.emit(ServiceMetricEvent.builder()
+                                   .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
+                                   .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                                   .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                                   .setDimensionIfNotNull(
+                                       DruidMetrics.TAGS,
+                                       spec.getContextValue(DruidMetrics.TAGS)
+                                   )
+                                   .setMetric(
+                                       AUTOSCALER_SCALING_TIME_METRIC,
+                                       scaleActionStopwatch.millisElapsed()
+                                   ));
+  }
+
+  /**
+   * Handles task count changes for perpetual tasks using updateConfig instead 
of graceful shutdown.
+   * This approach pauses tasks, recalculates partition assignments, and sends 
config updates.
+   */
+  private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount,
+                                                   Runnable 
successfulScaleAutoScalerCallback
+  )

Review Comment:
   Please fix the formatting.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -571,29 +599,306 @@ private boolean changeTaskCount(int 
desiredActiveTaskCount)
           dataSource
       );
       final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
-      gracefulShutdownInternal();
-      changeTaskCountInIOConfig(desiredActiveTaskCount);
-      clearAllocationInfo();
-      emitter.emit(ServiceMetricEvent.builder()
-                                     .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
-                                     .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
-                                     .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
-                                     .setDimensionIfNotNull(
-                                         DruidMetrics.TAGS,
-                                         
spec.getContextValue(DruidMetrics.TAGS)
-                                     )
-                                     .setMetric(
-                                         AUTOSCALER_SCALING_TIME_METRIC,
-                                         scaleActionStopwatch.millisElapsed()
-                                     ));

Review Comment:
   Let's revert this change for now as it is just moving the code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1716,6 +2042,10 @@ public TaskGroup 
addTaskGroupToPendingCompletionTaskGroup(
   @VisibleForTesting
   public void runInternal()
   {
+    if (isDynamicAllocationOngoing.get()) {
+      log.info("Skipping run because dynamic allocation is ongoing.");

Review Comment:
   This can get noisy as task scaling can be a slow operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to