kfaraz commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2378645919
##########
indexing-service/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java:
##########
@@ -556,4 +556,30 @@ private void verifyTaskStatus(TaskStatus expected,
TaskStatus actual)
{
Assert.assertEquals(expected, actual);
}
+
+ @Test
+ public void testUpdateTask()
Review Comment:
I don't think this is needed anymore.
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumberTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.constraints.NotNull;
+
+public class OrderedSequenceNumberTest
+{
+ @Test
+ public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_lessThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertTrue("Should have more to read when current < end with
exclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_equalTo()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(10L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertFalse("Should NOT have more to read when current == end with
exclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_exclusiveEnd_greaterThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(15L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertFalse("Should NOT have more to read when current > end with
exclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_lessThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertTrue("Should have more to read when current < end with
inclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_equalTo()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(10L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertTrue("Should have more to read when current == end with
inclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_greaterThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(15L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertFalse("Should NOT have more to read when current > end with
inclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_exclusiveEnd()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+ Assert.assertFalse("Should return false when end sequence number is null",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_inclusiveEnd()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+ Assert.assertFalse("Should return false when end sequence number is null",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void
testIsMoreToReadBeforeReadingRecord_nullCurrentSequenceNumber_exclusiveEnd()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(null, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertThrows(NullPointerException.class, () ->
current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void
testIsMoreToReadBeforeReadingRecord_nullCurrentSequenceNumber_inclusiveEnd()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(null, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertThrows(NullPointerException.class, () ->
current.isMoreToReadBeforeReadingRecord(end, false));
Review Comment:
Please put args in separate lines.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -117,6 +122,8 @@ public SeekableStreamSupervisorSpec(
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.suspended = suspended != null ? suspended : false;
this.supervisorStateManagerConfig = supervisorStateManagerConfig;
+ this.usePersistentTasks = Configs.valueOrDefault(usePersistentTasks,
false);
+ this.version = DateTimes.nowUtc().toString();
Review Comment:
We should not create a new version here and use the persisted version
instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -212,6 +218,21 @@ public class TaskGroup
boolean handoffEarly = false; // set by
SupervisorManager.stopTaskGroupEarly
+ public int getId()
+ {
+ return groupId;
+ }
+
+ public DateTime getMinimumMessageTime()
+ {
+ return minimumMessageTime;
+ }
+
+ public DateTime getMaximumMessageTime()
Review Comment:
Where are these getters used?
##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java:
##########
@@ -121,9 +121,9 @@ protected RecordSupplier<String, Long, ByteEntity>
setupRecordSupplier()
RabbitStreamIndexTaskTuningConfig taskTuningConfig =
spec.getTuningConfig();
return new RabbitStreamRecordSupplier(
- spec.getIoConfig().getConsumerProperties(),
+ spec.getSpec().getIOConfig().getConsumerProperties(),
Review Comment:
Gentle reminder: please revert these refactors from this PR.
We can do these in a separate follow up PR.
##########
indexing-service/src/test/java/org/apache/druid/metadata/MetadataStorageActionHandlerTest.java:
##########
@@ -148,6 +148,12 @@ public void populateTaskTypeAndGroupIdAsync()
{
}
+
+ @Override
+ public void update(String id, Task entry)
Review Comment:
Please remove this.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -113,4 +114,9 @@ default void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcep
{
// The default implementation does not do any validation checks.
}
+
+ default Optional<String> getVersion()
Review Comment:
We shouldn't put version into the `SupervisorSpec` since there is already a
`VersionedSupervisorSpec`.
You can maintain the version as a separate variable in the supervisor or
just use a `VersionedSupervisorSpec`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -251,6 +256,8 @@ public enum Status
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
private final ScheduledExecutorService rejectionPeriodUpdaterExec;
+ private final AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);
+
Review Comment:
Nit: remove the extra newline.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -78,6 +80,8 @@ private static SeekableStreamSupervisorIngestionSpec
checkIngestionSchema(
protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final boolean suspended;
protected final SupervisorStateManagerConfig supervisorStateManagerConfig;
+ protected final boolean usePersistentTasks;
+ protected String version;
Review Comment:
Version should not be present here.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -744,6 +744,12 @@ private TestTask(String id, Interval interval, Map<String,
Object> context)
this.interval = interval;
}
+ private TestTask(String id, String dataSource, Interval interval,
Map<String, Object> context)
Review Comment:
Is this needed anymore?
##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java:
##########
@@ -82,7 +82,8 @@ public RabbitStreamSupervisorSpec(
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
- supervisorStateManagerConfig);
+ supervisorStateManagerConfig,
+ null);
Review Comment:
```suggestion
null
);
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumberTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.constraints.NotNull;
+
+public class OrderedSequenceNumberTest
+{
+ @Test
+ public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_lessThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertTrue("Should have more to read when current < end with
exclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void test_isMoreToReadBeforeReadingRecord_exclusiveEnd_equalTo()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(10L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertFalse("Should NOT have more to read when current == end with
exclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_exclusiveEnd_greaterThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(15L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertFalse("Should NOT have more to read when current > end with
exclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_lessThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertTrue("Should have more to read when current < end with
inclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_equalTo()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(10L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertTrue("Should have more to read when current == end with
inclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void testIsMoreToReadBeforeReadingRecord_inclusiveEnd_greaterThan()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(15L, false);
+ TestSequenceNumber end = new TestSequenceNumber(10L, false);
+
+ Assert.assertFalse("Should NOT have more to read when current > end with
inclusive end",
+ current.isMoreToReadBeforeReadingRecord(end, false));
+ }
+
+ @Test
+ public void
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_exclusiveEnd()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+ Assert.assertFalse("Should return false when end sequence number is null",
+ current.isMoreToReadBeforeReadingRecord(end, true));
+ }
+
+ @Test
+ public void
testIsMoreToReadBeforeReadingRecord_nullEndSequenceNumber_inclusiveEnd()
+ {
+ TestSequenceNumber current = new TestSequenceNumber(5L, false);
+ TestSequenceNumber end = new TestSequenceNumber(null, false);
+
+ Assert.assertFalse("Should return false when end sequence number is null",
+ current.isMoreToReadBeforeReadingRecord(end, false));
Review Comment:
Please use this formatting style in all the assertions in this class.
```suggestion
Assert.assertFalse(
"Should return false when end sequence number is null",
current.isMoreToReadBeforeReadingRecord(end, false)
);
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -226,6 +226,70 @@ protected SeekableStreamIndexTaskIOConfig
createTaskIoConfig(
);
}
+ @Override
+ protected SeekableStreamIndexTaskIOConfig<KafkaTopicPartition, Long>
createUpdatedTaskIoConfig(
+ Set<KafkaTopicPartition> partitions,
+ TaskGroup existingTaskGroup,
+ Map<KafkaTopicPartition, Long> latestCommittedOffsets,
+ Map<KafkaTopicPartition, Long> latestTaskOffsetsOnPause
+ )
+ {
+ log.info("Creating updated task IO config for task group [%s]",
existingTaskGroup.getId());
Review Comment:
This should be removed as it can get pretty noisy.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -85,7 +87,9 @@ public SeekableStreamIndexTask(
final SeekableStreamIndexTaskTuningConfig tuningConfig,
final SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> ioConfig,
@Nullable final Map<String, Object> context,
- @Nullable final String groupId
+ @Nullable final String groupId,
+ @Nullable final Boolean isPerpetuallyRunning,
Review Comment:
For tasks, use the field name `isPersistent`.
For supervisor and supervisor spec, use the field name `usePersistentTasks`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,10 +1739,146 @@ public Response getUnparseableEvents(
return
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
}
+ @POST
+ @Path("/config")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response updateConfig(
Review Comment:
Since all other methods seem to follow this convention.
```suggestion
public Response updateConfigHTTP(
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -106,6 +110,8 @@ public SeekableStreamIndexTask(
: LockGranularity.SEGMENT;
this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
this.supervisorId =
Preconditions.checkNotNull(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), "supervisorId");
+ this.isPerpetuallyRunning = Configs.valueOrDefault(isPerpetuallyRunning,
false);
+ this.supervisorSpecVersion = Configs.valueOrDefault(supervisorSpecVersion,
"");
Review Comment:
Assigning to empty serves no purpose, let it be null if not specified.
```suggestion
this.supervisorSpecVersion = supervisorSpecVersion;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -195,14 +198,17 @@ public class TaskGroup
// this task group has completed successfully, at which point this will be
destroyed and a new task group will be
// created with new starting sequences. This allows us to create
replacement tasks for failed tasks that process the
// same sequences, even if the values in [partitionGroups] has been
changed.
- final ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
+ // In perpetually-running tasks mode, the actively running task groups
will be replaced with new task groups with updated starting sequences.
+ ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
// We don't include closed partitions in the starting offsets. However, we
keep the full unfiltered map of
// partitions, only used for generating the sequence name, to avoid
ambiguity in sequence names if mulitple
// task groups have nothing but closed partitions in their assignments.
+
Review Comment:
extra newline
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -195,14 +198,17 @@ public class TaskGroup
// this task group has completed successfully, at which point this will be
destroyed and a new task group will be
// created with new starting sequences. This allows us to create
replacement tasks for failed tasks that process the
// same sequences, even if the values in [partitionGroups] has been
changed.
- final ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
+ // In perpetually-running tasks mode, the actively running task groups
will be replaced with new task groups with updated starting sequences.
Review Comment:
```suggestion
// With persistent tasks, the actively running task groups will be
replaced with new task groups with updated starting sequences.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -153,6 +159,18 @@ public SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> getI
return ioConfig;
}
+ @JsonProperty("isPerpetuallyRunning")
Review Comment:
```suggestion
@JsonProperty
```
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -391,7 +394,10 @@ private KafkaSupervisorSpec createKafkaSupervisor(
return MoreResources.Supervisor.KAFKA_JSON
.get()
.withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null)))
- .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(maxRowsPerSegment))
+ .withTuningConfig(tuningConfig -> tuningConfig
Review Comment:
Please revert all the changes to this file.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -106,6 +110,8 @@ public SeekableStreamIndexTask(
: LockGranularity.SEGMENT;
this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
this.supervisorId =
Preconditions.checkNotNull(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), "supervisorId");
+ this.isPerpetuallyRunning = Configs.valueOrDefault(isPerpetuallyRunning,
false);
Review Comment:
The `false` here should probably be a public constant in
`SeekableStreamSupervisor` so that both the supervisor spec and the task can
use this default.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -153,6 +159,18 @@ public SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> getI
return ioConfig;
}
+ @JsonProperty("isPerpetuallyRunning")
+ public boolean isPerpetuallyRunning()
+ {
+ return isPerpetuallyRunning;
+ }
+
+ @JsonProperty("supervisorSpecVersion")
Review Comment:
```suggestion
@JsonProperty
```
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTaskScalingTest.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import
org.apache.druid.indexing.kafka.supervisor.LagBasedAutoScalerConfigBuilder;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Embedded test to verify task scaling behaviour of {@code KafkaSupervisor}
ingesting from a custom kafka topic.
+ */
+@SuppressWarnings("resource")
+public class KafkaTaskScalingTest extends EmbeddedClusterTestBase
Review Comment:
Why is this class separate from `KafkaTaskAutoScalingTest`. We can merge the
two classes into one.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -571,29 +599,306 @@ private boolean changeTaskCount(int
desiredActiveTaskCount)
dataSource
);
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
- gracefulShutdownInternal();
- changeTaskCountInIOConfig(desiredActiveTaskCount);
- clearAllocationInfo();
- emitter.emit(ServiceMetricEvent.builder()
- .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
- .setDimension(DruidMetrics.DATASOURCE,
dataSource)
- .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
- .setDimensionIfNotNull(
- DruidMetrics.TAGS,
-
spec.getContextValue(DruidMetrics.TAGS)
- )
- .setMetric(
- AUTOSCALER_SCALING_TIME_METRIC,
- scaleActionStopwatch.millisElapsed()
- ));
+
+ if (spec.usePersistentTasks()) {
+ return changeTaskCountForPerpetualTasks(desiredActiveTaskCount,
successfulScaleAutoScalerCallback);
+ } else {
+ gracefulShutdownInternal();
+ changeTaskCountInIOConfig(desiredActiveTaskCount);
+ clearAllocationInfo();
+ }
+ emitAutoScalerRunMetric(scaleActionStopwatch);
log.info("Changed taskCount to [%s] for supervisor[%s] for
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
return true;
}
}
+ private void emitAutoScalerRunMetric(Stopwatch scaleActionStopwatch)
+ {
+ emitter.emit(ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
+ .setDimensionIfNotNull(
+ DruidMetrics.TAGS,
+ spec.getContextValue(DruidMetrics.TAGS)
+ )
+ .setMetric(
+ AUTOSCALER_SCALING_TIME_METRIC,
+ scaleActionStopwatch.millisElapsed()
+ ));
+ }
+
+ /**
+ * Handles task count changes for perpetual tasks using updateConfig instead
of graceful shutdown.
+ * This approach pauses tasks, recalculates partition assignments, and sends
config updates.
+ */
+ private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount,
+ Runnable
successfulScaleAutoScalerCallback
+ )
Review Comment:
Please fix the formatting.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -571,29 +599,306 @@ private boolean changeTaskCount(int
desiredActiveTaskCount)
dataSource
);
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
- gracefulShutdownInternal();
- changeTaskCountInIOConfig(desiredActiveTaskCount);
- clearAllocationInfo();
- emitter.emit(ServiceMetricEvent.builder()
- .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
- .setDimension(DruidMetrics.DATASOURCE,
dataSource)
- .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
- .setDimensionIfNotNull(
- DruidMetrics.TAGS,
-
spec.getContextValue(DruidMetrics.TAGS)
- )
- .setMetric(
- AUTOSCALER_SCALING_TIME_METRIC,
- scaleActionStopwatch.millisElapsed()
- ));
+
+ if (spec.usePersistentTasks()) {
+ return changeTaskCountForPerpetualTasks(desiredActiveTaskCount,
successfulScaleAutoScalerCallback);
+ } else {
+ gracefulShutdownInternal();
+ changeTaskCountInIOConfig(desiredActiveTaskCount);
+ clearAllocationInfo();
+ }
+ emitAutoScalerRunMetric(scaleActionStopwatch);
log.info("Changed taskCount to [%s] for supervisor[%s] for
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
return true;
}
}
+ private void emitAutoScalerRunMetric(Stopwatch scaleActionStopwatch)
+ {
+ emitter.emit(ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
+ .setDimensionIfNotNull(
+ DruidMetrics.TAGS,
+ spec.getContextValue(DruidMetrics.TAGS)
+ )
+ .setMetric(
+ AUTOSCALER_SCALING_TIME_METRIC,
+ scaleActionStopwatch.millisElapsed()
+ ));
+ }
+
+ /**
+ * Handles task count changes for perpetual tasks using updateConfig instead
of graceful shutdown.
+ * This approach pauses tasks, recalculates partition assignments, and sends
config updates.
+ */
+ private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount,
+ Runnable
successfulScaleAutoScalerCallback
+ )
Review Comment:
Please fix the formatting.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -571,29 +599,306 @@ private boolean changeTaskCount(int
desiredActiveTaskCount)
dataSource
);
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
- gracefulShutdownInternal();
- changeTaskCountInIOConfig(desiredActiveTaskCount);
- clearAllocationInfo();
- emitter.emit(ServiceMetricEvent.builder()
- .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
- .setDimension(DruidMetrics.DATASOURCE,
dataSource)
- .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
- .setDimensionIfNotNull(
- DruidMetrics.TAGS,
-
spec.getContextValue(DruidMetrics.TAGS)
- )
- .setMetric(
- AUTOSCALER_SCALING_TIME_METRIC,
- scaleActionStopwatch.millisElapsed()
- ));
Review Comment:
Let's revert this change for now as it is just moving the code.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1716,6 +2042,10 @@ public TaskGroup
addTaskGroupToPendingCompletionTaskGroup(
@VisibleForTesting
public void runInternal()
{
+ if (isDynamicAllocationOngoing.get()) {
+ log.info("Skipping run because dynamic allocation is ongoing.");
Review Comment:
This can get noisy as task scaling can be a slow operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]