This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 24b757c39ae Do not kill a task if offsets are inconsistent but publish
from another group is pending (#19091)
24b757c39ae is described below
commit 24b757c39ae5425afb59f925f3cbfd3141701fa0
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Mar 10 03:03:51 2026 +0530
Do not kill a task if offsets are inconsistent but publish from another
group is pending (#19091)
Follow up to #19034
Changes
---------
- Add method
`SeekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions()`
- Use this method to check if a task needs to wait before publishing its
own offests
- Update `SegmentTransactionalAppendAction` and
`SegmentTransactionalInsertAction` to return
a retryable error response only if there is a pending publish that
conflicts with the current action
- Fix behaviour of scale down on task rollover in `SeekableStreamSupervisor`
- Fix bug in `SeekableStreamSupervisorIOConfig`
- Fix bug in `CostBasedAutoScaler` to avoid spurious scale downs
- Validate metrics in `CostBasedAutoScaler` before proceeding with scaling
action
- Add new tests in `CostBasedAutoScalerIntegrationTest`
---
docs/ingestion/supervisor.md | 6 +-
.../CostBasedAutoScalerIntegrationTest.java | 264 +++++++++++----------
.../kinesis/supervisor/KinesisSupervisorTest.java | 3 +-
.../actions/SegmentTransactionalAppendAction.java | 7 +-
.../actions/SegmentTransactionalInsertAction.java | 7 +-
.../indexing/common/actions/TaskActionToolbox.java | 26 ++
.../overlord/supervisor/SupervisorManager.java | 51 ++++
.../supervisor/SeekableStreamSupervisor.java | 179 ++++++++++----
.../SeekableStreamSupervisorIOConfig.java | 9 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 151 ++++++++----
.../supervisor/autoscaler/CostMetrics.java | 2 +-
.../supervisor/autoscaler/CostResult.java | 6 -
.../autoscaler/WeightedCostFunction.java | 21 +-
.../SegmentTransactionalInsertActionTest.java | 2 +-
...treamSupervisorScaleDuringTaskRolloverTest.java | 14 +-
.../SeekableStreamSupervisorStateTest.java | 2 +-
.../autoscaler/CostBasedAutoScalerMockTest.java | 5 +
.../autoscaler/CostBasedAutoScalerTest.java | 1 +
.../autoscaler/WeightedCostFunctionTest.java | 46 ++--
.../TransactionalSegmentPublisher.java | 10 +-
.../TransactionalSegmentPublisherTest.java | 8 +-
21 files changed, 539 insertions(+), 281 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 360e6ad86d2..8595912e846 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -77,8 +77,8 @@ The following table outlines the configuration properties for
`autoScalerConfig`
|--------|-----------|--------|-------|
|`enableTaskAutoScaler`|Enables the autoscaler. If not specified, Druid
disables the autoscaler even when `autoScalerConfig` is not null.|No|`false`|
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to
the number of Kafka partitions or Kinesis shards and ignores
`taskCountMax`.|Yes||
-|`taskCountMin`|The minimum number of ingestion tasks. When you enable the
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts
with the `taskCountMin` number of tasks to launch.|Yes||
-|`taskCountStart`|Optional config to specify the number of ingestion tasks to
start with. When you enable the autoscaler, Druid ignores the value of
`taskCount` in `ioConfig` and, if specified, starts with the `taskCountStart`
number of tasks. Otherwise, defaults to `taskCountMin`.|No|`taskCountMin`|
+|`taskCountMin`|The minimum number of ingestion tasks. When you enable the
autoscaler, Druid computes the initial number of tasks to launch by checking
the configs in the following order: `taskCountStart`, then `taskCount` (in
`ioConfig`), then `taskCountMin`.|Yes||
+|`taskCountStart`|Optional config to specify the number of ingestion tasks to
start with. When you enable the autoscaler, Druid computes the initial number
of tasks to launch by checking the configs in the following order:
`taskCountStart`, then `taskCount` (in `ioConfig`), then
`taskCountMin`.|No|`taskCount` or `taskCountMin`|
|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two
scale actions.| No|600000|
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more
information.|No|`lagBased`|
|`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a
valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in
steady state to be proportional to the number of tasks currently running.|No||
@@ -209,7 +209,7 @@ The following table outlines the configuration properties
related to the `costBa
|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
|`idleWeight`|The weight of extracted poll idle value in cost function. | No |
0.75 |
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when
selecting task counts.|No| `false` |
-|`highLagThreshold`|Per-partition lag threshold that triggers burst scale-up
when set to a value greater than `0`. Set to a negative value to disable burst
scale-up.|No|-1|
+|`highLagThreshold`|Average partition lag threshold that triggers burst
scale-up when set to a value greater than `0`. Set to a negative value to
disable burst scale-up.|No|-1|
|`minScaleDownDelay`|Minimum duration between successful scale actions,
specified as an ISO-8601 duration string.|No|`PT30M`|
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is
limited to periods during task rollovers only.|No|`false`|
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index 52d2bb54397..6bdb13236bf 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -19,38 +19,34 @@
package org.apache.druid.testing.embedded.indexing.autoscaler;
-import org.apache.druid.data.input.impl.TimestampSpec;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
-import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
-import org.apache.druid.testing.embedded.EmbeddedHistorical;
-import org.apache.druid.testing.embedded.EmbeddedIndexer;
-import org.apache.druid.testing.embedded.EmbeddedOverlord;
-import org.apache.druid.testing.embedded.EmbeddedRouter;
-import org.apache.druid.testing.embedded.indexing.MoreResources;
-import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.apache.druid.testing.embedded.indexing.StreamIndexTestBase;
import org.hamcrest.Matchers;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
+import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.OPTIMAL_TASK_COUNT_METRIC;
@@ -60,64 +56,38 @@ import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.Cos
* Tests the autoscaler's ability to compute optimal task counts based on
partition count and cost metrics (lag and idle time).
*/
@SuppressWarnings("resource")
-public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
+public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase
{
- private static final String TOPIC =
EmbeddedClusterApis.createTestDatasourceName();
- private static final String EVENT_TEMPLATE =
"{\"timestamp\":\"%s\",\"dimension\":\"value%d\",\"metric\":%d}";
private static final int PARTITION_COUNT = 50;
- private final EmbeddedBroker broker = new EmbeddedBroker();
- private final EmbeddedIndexer indexer = new EmbeddedIndexer();
- private final EmbeddedOverlord overlord = new EmbeddedOverlord();
- private final EmbeddedHistorical historical = new EmbeddedHistorical();
- private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
- private KafkaResource kafkaServer;
+ private String topic;
+ private final KafkaResource kafkaServer = new KafkaResource();
+
+ @Override
+ protected StreamIngestResource<?> getStreamIngestResource()
+ {
+ return kafkaServer;
+ }
@Override
public EmbeddedDruidCluster createCluster()
{
- final EmbeddedDruidCluster cluster =
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
-
- kafkaServer = new KafkaResource()
- {
- @Override
- public void start()
- {
- super.start();
- createTopicWithPartitions(TOPIC, PARTITION_COUNT);
- produceRecordsToKafka(500, 1);
- }
+ return super
+ .createCluster()
+ .useDefaultTimeoutForLatchableEmitter(600);
+ }
- @Override
- public void stop()
- {
- deleteTopic(TOPIC);
- super.stop();
- }
- };
-
- indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
- .addProperty("druid.worker.capacity", "100");
-
- cluster.useLatchableEmitter()
- .useDefaultTimeoutForLatchableEmitter(60)
- .addResource(kafkaServer)
- .addServer(coordinator)
- .addServer(overlord)
- .addServer(indexer)
- .addServer(broker)
- .addServer(historical)
- .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.5s")
- .addServer(new EmbeddedRouter());
-
- return cluster;
+ @BeforeEach
+ public void createTopic()
+ {
+ topic = dataSource;
+ kafkaServer.createTopicWithPartitions(topic, PARTITION_COUNT);
}
@Test
@Timeout(45)
public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
{
- final String superId = dataSource + "_super";
final int initialTaskCount = 10;
final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
@@ -135,10 +105,10 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.minScaleDownDelay(Duration.ZERO)
.build();
- final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
+ final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(autoScalerConfig, initialTaskCount);
// Submit the supervisor
- Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+ Assertions.assertEquals(spec.getId(),
cluster.callApi().postSupervisor(spec));
// Wait for the supervisor to be healthy and running
overlord.latchableEmitter()
@@ -159,8 +129,6 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
@Test
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
{
- final String superId = dataSource + "_super_scaleup";
-
// Start with a low task count (1 task for 50 partitions) and produce a
large amount of data
// to create lag pressure and low idle ratio, which should trigger a
scale-up decision.
// With the ideal idle range [0.2, 0.6], a single overloaded task will
have idle < 0.2,
@@ -169,7 +137,11 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
// Produce additional records to create a backlog / lag
// This ensures tasks are busy processing (low idle ratio)
- Executors.newSingleThreadExecutor().submit(() ->
produceRecordsToKafka(500_000, 20));
+ Executors.newSingleThreadExecutor().submit(() -> {
+ for (int i = 0; i < 500; ++i) {
+ publish1kRecords(topic, true);
+ }
+ });
// These values were carefully handpicked to allow that test to pass in a
stable manner.
final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
@@ -185,13 +157,12 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.build();
final KafkaSupervisorSpec kafkaSupervisorSpec =
createKafkaSupervisorWithAutoScaler(
- superId,
autoScalerConfig,
lowInitialTaskCount
);
// Submit the supervisor
- Assertions.assertEquals(superId,
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
+ Assertions.assertEquals(kafkaSupervisorSpec.getId(),
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
// Wait for the supervisor to be healthy and running
overlord.latchableEmitter()
@@ -209,10 +180,93 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
+ @Test
+ public void test_autoScaler_scalesUpAndDown_withSlowPublish()
+ {
+ final String topic = EmbeddedClusterApis.createTestDatasourceName();
+ kafkaServer.createTopicWithPartitions(topic, 4);
+
+ // A small value of maxRowsPerSegment ensures that there are a large number
+ // of segments to publish, thus slowing down publish actions
+ final int maxRowsPerSegment = 100;
+
+ final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
+ .builder()
+ .taskCountMin(1)
+ .taskCountMax(4)
+ .lagWeight(1.0)
+ .idleWeight(1.0)
+ .enableTaskAutoScaler(true)
+ .minTriggerScaleActionFrequencyMillis(10L)
+ .scaleActionPeriodMillis(10L)
+ .minScaleDownDelay(Duration.standardSeconds(1))
+ .build();
+
+ // taskDuration of 10s gives enough time to auto-scaler to fetch task
metrics
+ final SupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
+ .withTuningConfig(t -> t.withMaxRowsPerSegment(maxRowsPerSegment))
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withTaskCount(1)
+ .withTaskDuration(Period.seconds(10))
+ .withSupervisorRunPeriod(Period.millis(10))
+ .withAutoScalerConfig(autoScalerConfig)
+ )
+ .build(dataSource, topic);
+ cluster.callApi().postSupervisor(supervisor);
+
+ // Ingest a large number of records to trigger a scale-up
+ // 10k records = 100 segments to publish * 100 rows per segment
+ int totalRecords = 0;
+ for (int i = 0; i < 10; ++i) {
+ totalRecords += publish1kRecords(topic, false);
+ }
+
+ // Wait for tasks to scale up
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/autoScaler/updatedCount")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.equalTo(4L))
+ );
+ Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ // Let the tasks work through the lag.
+ // Do not publish any more records so that the idleness causes scale-down
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/autoScaler/updatedCount")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.equalTo(1L))
+ );
+ Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+
+ cluster.callApi().postSupervisor(supervisor.createSuspendedSpec());
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ Assertions.assertEquals("10000", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+
+ final List<TaskStatusPlus> tasks = cluster.callApi().getTasks(dataSource,
"complete");
+ Assertions.assertFalse(tasks.isEmpty());
+
+ // Ensure that there are no task failures due to auto-scaling
+ final String expectedErrorOnShutdown = "Killing task for graceful
shutdown";
+ final Map<String, String> taskIdToError = new HashMap<>();
+ for (TaskStatusPlus task : tasks) {
+ if (task.getStatusCode() == TaskState.FAILED &&
!expectedErrorOnShutdown.equals(task.getErrorMsg())) {
+ taskIdToError.put(task.getId(), task.getErrorMsg());
+ }
+ }
+ Assertions.assertTrue(
+ taskIdToError.isEmpty(),
+ StringUtils.format(
+ "[%d / %d] tasks have failed with errors: %s",
+ taskIdToError.size(), tasks.size(), taskIdToError
+ )
+ );
+ }
+
@Test
void test_scaleDownDuringTaskRollover()
{
- final String superId = dataSource + "_super";
final int initialTaskCount = 10;
final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
@@ -220,9 +274,8 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.enableTaskAutoScaler(true)
.taskCountMin(1)
.taskCountMax(10)
- .taskCountStart(initialTaskCount)
- .scaleActionPeriodMillis(2000)
- .minTriggerScaleActionFrequencyMillis(2000)
+ .scaleActionPeriodMillis(100)
+ .minTriggerScaleActionFrequencyMillis(100)
// High idle weight ensures scale-down when tasks are mostly idle
(little data to process)
.lagWeight(0.1)
.idleWeight(0.9)
@@ -231,10 +284,10 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.minScaleDownDelay(Duration.ZERO)
.build();
- final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
+ final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(autoScalerConfig, initialTaskCount);
// Submit the supervisor
- Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+ Assertions.assertEquals(spec.getId(),
cluster.callApi().postSupervisor(spec));
// Wait for at least one task running for the datasource managed by the
supervisor.
overlord.latchableEmitter().waitForEvent(e ->
e.hasMetricName("task/run/time")
@@ -242,19 +295,14 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
// Wait for autoscaler to emit metric indicating scale-down, it should be
just less than the current task count.
overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
- .hasValueMatching(Matchers.lessThan((long)
initialTaskCount)));
-
- // Wait for tasks to complete (first rollover)
- overlord.latchableEmitter().waitForEvent(e ->
e.hasMetricName("task/action/success/count"));
-
- // Wait for the task running for the datasource managed by a supervisor.
- overlord.latchableEmitter().waitForEvent(e ->
e.hasMetricName("task/run/time")
-
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+ event -> event.hasMetricName("task/autoScaler/updatedCount")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID, spec.getId())
+ .hasValueMatching(Matchers.lessThan((long)
initialTaskCount))
+ );
// After rollover, verify that the running task count has decreased
// The autoscaler should have recommended fewer tasks due to high idle time
- final int postRolloverRunningTasks =
cluster.callApi().getTaskCount("running", dataSource);
+ final int postRolloverRunningTasks = getCurrentTaskCount(spec.getId());
Assertions.assertTrue(
postRolloverRunningTasks < initialTaskCount,
@@ -269,53 +317,29 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
cluster.callApi().postSupervisor(spec.createSuspendedSpec());
}
- private void produceRecordsToKafka(int recordCount, int iterations)
- {
- int recordCountPerSlice = recordCount / iterations;
- int counter = 0;
- for (int i = 0; i < iterations; i++) {
- DateTime timestamp = DateTime.now(DateTimeZone.UTC);
- List<ProducerRecord<byte[], byte[]>> records = IntStream
- .range(counter, counter + recordCountPerSlice)
- .mapToObj(k -> new ProducerRecord<byte[], byte[]>(
- TOPIC,
- k % PARTITION_COUNT,
- null,
- StringUtils.format(EVENT_TEMPLATE, timestamp, k, k)
- .getBytes(StandardCharsets.UTF_8)
- )
- )
- .collect(Collectors.toList());
-
- kafkaServer.produceRecordsToTopic(records);
- try {
- Thread.sleep(100L);
- counter += recordCountPerSlice;
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
- String supervisorId,
CostBasedAutoScalerConfig autoScalerConfig,
int taskCount
)
{
- return MoreResources.Supervisor.KAFKA_JSON
- .get()
- .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null)))
- .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(100))
+ return createKafkaSupervisor(kafkaServer)
.withIoConfig(
ioConfig -> ioConfig
- .withConsumerProperties(kafkaServer.consumerProperties())
.withTaskCount(taskCount)
- .withTaskDuration(Period.seconds(7))
+ .withTaskDuration(Period.seconds(1))
.withAutoScalerConfig(autoScalerConfig)
)
- .withId(supervisorId)
- .build(dataSource, TOPIC);
+ .build(dataSource, topic);
+ }
+
+ private int getCurrentTaskCount(String supervisorId)
+ {
+ final String getSupervisorPath =
StringUtils.format("/druid/indexer/v1/supervisor/%s", supervisorId);
+ final KafkaSupervisorSpec supervisorSpec =
cluster.callApi().serviceClient().onLeaderOverlord(
+ mapper -> new RequestBuilder(HttpMethod.GET, getSupervisorPath),
+ new TypeReference<>(){}
+ );
+ Assertions.assertNotNull(supervisorSpec);
+ return supervisorSpec.getSpec().getIOConfig().getTaskCount();
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 78d3c6791ab..522aebad4cf 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -428,8 +428,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
replayAll();
int taskCountInit = supervisor.getIoConfig().getTaskCount();
- // when enable autoScaler the init taskCount will be equal to taskCountMin
- Assert.assertEquals(1, taskCountInit);
+ Assert.assertEquals(2, taskCountInit);
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 3833c4be6b0..15001fcd4a2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -219,7 +219,12 @@ public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPubli
}
IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
- return retVal;
+
+ if (toolbox.shouldFailSegmentPublishImmediately(retVal, task,
supervisorId, startMetadata)) {
+ return SegmentPublishResult.fail(retVal.getErrorMsg());
+ } else {
+ return retVal;
+ }
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index ab84c38cc6a..bf121812573 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -259,7 +259,12 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
}
IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
- return retVal;
+
+ if (toolbox.shouldFailSegmentPublishImmediately(retVal, task,
supervisorId, startMetadata)) {
+ return SegmentPublishResult.fail(retVal.getErrorMsg());
+ } else {
+ return retVal;
+ }
}
private void checkWithSegmentLock()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
index d430e3f5a9f..6294710f167 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
@@ -23,8 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.GlobalTaskLockbox;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskStorage;
@@ -135,4 +138,27 @@ public class TaskActionToolbox
{
return segmentAllocationQueue != null &&
segmentAllocationQueue.isEnabled();
}
+
+ /**
+ * Checks if the given publish action should be failed without allowing any
+ * more retries. A failed publish action should be retried only if there is
+ * another task waiting to publish offsets for an overlapping set of
partitions.
+ */
+ public boolean shouldFailSegmentPublishImmediately(
+ SegmentPublishResult result,
+ Task task,
+ String supervisorId,
+ DataSourceMetadata startMetadata
+ )
+ {
+ if (result.isSuccess() || !result.isRetryable() || startMetadata == null) {
+ return false;
+ }
+
+ return !getSupervisorManager().isAnotherTaskGroupPublishingToPartitions(
+ supervisorId,
+ task.getId(),
+ startMetadata
+ );
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 0cd799bed54..426f3a71619 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -27,11 +27,14 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.error.NotFound;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.Pair;
@@ -427,6 +430,54 @@ public class SupervisorManager
return false;
}
+ /**
+ * Checks if there is a Task distinct from the given {@code taskId} or its
replicas
+ * that is currently waiting to publish offsets for the given partitions.
+ */
+ public boolean isAnotherTaskGroupPublishingToPartitions(
+ String supervisorId,
+ String taskId,
+ DataSourceMetadata startMetadata
+ )
+ {
+ try {
+ InvalidInput.conditionalException(supervisorId != null, "'supervisorId'
cannot be null");
+ if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+ throw InvalidInput.exception(
+ "Start metadata[%s] of type[%s] is not valid streaming metadata",
+ supervisorId, startMetadata.getClass()
+ );
+ }
+
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ if (supervisor == null || supervisor.rhs == null) {
+ throw NotFound.exception("Could not find supervisor[%s]",
supervisorId);
+ }
+ if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
+ throw InvalidInput.exception(
+ "Supervisor[%s] of type[%s] is not a streaming supervisor",
+ supervisorId, supervisor.rhs.getType()
+ );
+ }
+
+ final Set<Object> partitionIds = Set.copyOf(
+ ((SeekableStreamDataSourceMetadata<?, ?>) startMetadata)
+ .getSeekableStreamSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .keySet()
+ );
+ return ((SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs)
+ .isAnotherTaskGroupPublishingToPartitions(taskId, partitionIds);
+ }
+ catch (Exception e) {
+ log.error(
+ e,
+ "Failed to check if a publish is pending for supervisor[%s],
metadata[%s]",
+ supervisorId, startMetadata
+ );
+ return false;
+ }
+ }
/**
* Stops a supervisor with a given id and then removes it from the list.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f5128dc3809..dbe79b85780 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -130,6 +130,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
@@ -157,6 +158,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
public static final String AUTOSCALER_SKIP_REASON_DIMENSION =
"scalingSkipReason";
public static final String AUTOSCALER_REQUIRED_TASKS_METRIC =
"task/autoScaler/requiredCount";
+ public static final String AUTOSCALER_UPDATED_TASK_METRIC =
"task/autoScaler/updatedCount";
public static final String AUTOSCALER_SCALING_TIME_METRIC =
"task/autoScaler/scaleActionTime";
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
@@ -503,7 +505,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
- if (!list.isEmpty()) {
+ // There are expected to be pending tasks if this scaling is
happening on task rollover
+ if (!list.isEmpty() && !isScalingTasksOnRollover.get()) {
log.info(
"Skipping DynamicAllocationTasksNotice execution for
supervisor[%s] for datasource[%s] because following tasks are pending [%s]",
supervisorId,
@@ -591,9 +594,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private boolean changeTaskCount(int desiredActiveTaskCount)
throws InterruptedException, ExecutionException
{
- int currentActiveTaskCount;
- Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
- currentActiveTaskCount = activeTaskGroups.size();
+ final int currentActiveTaskCount = getCurrentTaskCount();
if (desiredActiveTaskCount < 0 || desiredActiveTaskCount ==
currentActiveTaskCount) {
return false;
@@ -609,19 +610,15 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
gracefulShutdownInternal();
changeTaskCountInIOConfig(desiredActiveTaskCount);
clearPartitionAssignmentsForScaling();
- emitter.emit(ServiceMetricEvent.builder()
- .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
- .setDimension(DruidMetrics.DATASOURCE,
dataSource)
- .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
- .setDimensionIfNotNull(
- DruidMetrics.TAGS,
-
spec.getContextValue(DruidMetrics.TAGS)
- )
- .setMetric(
- AUTOSCALER_SCALING_TIME_METRIC,
- scaleActionStopwatch.millisElapsed()
- ));
- log.info("Changed taskCount to [%s] for supervisor[%s] for
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
+
+
+ final ServiceMetricEvent.Builder metricBuilder = getMetricBuilder();
+ emitter.emit(metricBuilder.setMetric(AUTOSCALER_SCALING_TIME_METRIC,
scaleActionStopwatch.millisElapsed()));
+ emitter.emit(metricBuilder.setMetric(AUTOSCALER_UPDATED_TASK_METRIC,
(long) desiredActiveTaskCount));
+ log.info(
+ "Updated taskCount from [%s] to [%s] for supervisor[%s] for
dataSource[%s].",
+ currentActiveTaskCount, desiredActiveTaskCount, supervisorId,
dataSource
+ );
return true;
}
}
@@ -643,6 +640,14 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * @return Current task count for this supervisor.
+ */
+ private int getCurrentTaskCount()
+ {
+ return getIoConfig().getTaskCount();
+ }
+
/**
* Clears previous partition assignments in preparation for an upcoming
scaling event.
* <p>
@@ -968,6 +973,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Wired by {@link SupervisorManager} after supervisor creation.
*/
private volatile SupervisorTaskAutoScaler taskAutoScaler;
+ private final AtomicBoolean isScalingTasksOnRollover = new
AtomicBoolean(false);
// snapshots latest sequences from the stream to be verified in the next run
cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType>
previousSequencesFromStream = new HashMap<>();
@@ -2615,9 +2621,17 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return sequence.compareTo(latestOffset) == 0;
}
- ) && earliestConsistentSequenceId.compareAndSet(-1,
sequenceCheckpoint.getKey())) || (
- !pendingCompletionTaskGroups.getOrDefault(groupId, new
CopyOnWriteArrayList<>()).isEmpty()
- && earliestConsistentSequenceId.compareAndSet(-1,
taskCheckpoints.firstKey()))) {
+ ) && earliestConsistentSequenceId.compareAndSet(-1,
sequenceCheckpoint.getKey()))
+ // OR if there is an older publishing task for the same groupId,
use the first sequenceId of this task
+ // since the offsets persisted in metadata store might still be
behind the checkpoints of this task
+ || (!pendingCompletionTaskGroups.getOrDefault(groupId, new
CopyOnWriteArrayList<>()).isEmpty()
+ && earliestConsistentSequenceId.compareAndSet(-1,
taskCheckpoints.firstKey()))
+
+ // OR if there is an older publishing task for another groupId,
use the first sequenceId of this task
+ // since the offsets persisted in metadata store might still be
behind the checkpoints of this task
+ || (isAnotherTaskGroupPublishingToPartitions(taskId,
Set.copyOf(taskGroup.startingSequences.keySet()))
+ && earliestConsistentSequenceId.compareAndSet(-1,
taskCheckpoints.firstKey()))
+ ) {
final SortedMap<Integer, Map<PartitionIdType, SequenceOffsetType>>
latestCheckpoints = new TreeMap<>(
taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
);
@@ -2626,8 +2640,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroup.checkpointSequences.clear();
taskGroup.checkpointSequences.putAll(latestCheckpoints);
} else {
- log.debug(
- "Adding task[%s] to kill list, checkpoints[%s], latestoffsets
from DB [%s]",
+ log.warn(
+ "Adding task[%s] to kill list as its checkpoints[%s] are not
consistent with checkpoints[%s] in metadata store",
taskId,
taskCheckpoints,
latestOffsetsFromDb
@@ -2641,8 +2655,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.equals(taskGroup.checkpointSequences.firstEntry().getValue()))
||
taskCheckpoints.tailMap(taskGroup.checkpointSequences.firstKey()).size()
!= taskGroup.checkpointSequences.size()) {
- log.debug(
- "Adding task[%s] to kill list, checkpoints[%s], taskgroup
checkpoints [%s]",
+ log.warn(
+ "Adding task[%s] to kill list as its checkpoints[%s] are not
consistent with taskGroup checkpoints[%s]",
taskId,
taskCheckpoints,
taskGroup.checkpointSequences
@@ -2669,18 +2683,57 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
sequenceCheckpoint -> {
killTask(
sequenceCheckpoint.lhs,
- "Killing task[%s], as its checkpoints[%s] are not consistent
with group checkpoints[%s]"
- + " or latest persisted sequences in metadata store[%s].",
- sequenceCheckpoint.lhs,
- sequenceCheckpoint.rhs,
- taskGroup.checkpointSequences,
- latestOffsetsFromDb
+ "Killing task as its checkpoints are not consistent with group
checkpoints"
+ + " or latest persisted sequences in metadata store."
);
taskGroup.removeTask(sequenceCheckpoint.lhs);
}
);
}
+ /**
+ * Checks if there is a Task distinct from the given {@code taskId} or its
replicas
+ * publishing to any of the given partitions. If this method returns true, it
+ * indicates that the current task would need to wait for the other tasks to
+ * finish publishing before it can publish its own offsets.
+ */
+ public boolean isAnotherTaskGroupPublishingToPartitions(String taskId,
Set<Object> partitions)
+ {
+ // Identify all the partitions that are being published by other taskGroups
+ final Map<PartitionIdType, Set<TaskGroup>> partitionIdToPublishingGroups =
new HashMap<>();
+
pendingCompletionTaskGroups.values().stream().flatMap(Collection::stream).forEach(group
-> {
+ // Do not consider this taskId or its replicas
+ if (group.taskIds().contains(taskId)) {
+ return;
+ }
+ for (PartitionIdType partitionId : group.startingSequences.keySet()) {
+ partitionIdToPublishingGroups.computeIfAbsent(partitionId, p -> new
HashSet<>()).add(group);
+ }
+ });
+
+ if (partitionIdToPublishingGroups.isEmpty()) {
+ return false;
+ }
+
+ // Check if any of the partitions of this taskId are being published by
other taskGroups
+ for (Object partition : partitions) {
+ @SuppressWarnings("unchecked")
+ final PartitionIdType partitionId = (PartitionIdType) partition;
+ if (partitionIdToPublishingGroups.containsKey(partitionId)) {
+ log.info(
+ "Task[%s] needs to wait before publishing as other taskGroups[%s]
are currently publishing to partition[%s].",
+ taskId,
+ partitionIdToPublishingGroups.get(partitionId),
+ partitionId
+ );
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
@VisibleForTesting
protected void addDiscoveredTaskToPendingCompletionTaskGroups(
int groupId,
@@ -2911,7 +2964,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public int getPartitionCount()
{
- return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+ recordSupplierLock.lock();
+ try {
+ return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+ }
+ finally {
+ recordSupplierLock.unlock();
+ }
}
private boolean updatePartitionDataFromStream()
@@ -3412,6 +3471,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
final AtomicInteger numStoppedTasks = new AtomicInteger();
+ final AtomicBoolean hasTaskRolloverStarted = new AtomicBoolean(false);
// Sort task groups by start time to prioritize early termination of
earlier groups, then iterate for processing
activelyReadingTaskGroups.entrySet().stream().sorted(
Comparator.comparingLong(
@@ -3435,6 +3495,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (group.getHandoffEarly()) {
numStoppedTasks.getAndIncrement();
}
+ if (stopTasksEarly) {
+ hasTaskRolloverStarted.set(true);
+ }
} else if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// Stop this task group if it has run longer than the configured
duration
// and the pending task groups are less than the configured stop
task count.
@@ -3448,6 +3511,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
numStoppedTasks.getAndIncrement();
+ hasTaskRolloverStarted.set(true);
}
}
});
@@ -3503,7 +3567,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
activelyReadingTaskGroups.remove(groupId);
}
- maybeScaleDuringTaskRollover();
+ // If rollover has started, check if scaling needs to be done
+ if (numStoppedTasks.get() > 0 && hasTaskRolloverStarted.get()) {
+ maybeScaleDuringTaskRollover();
+ }
}
/**
@@ -3511,27 +3578,27 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* <p>
* This method is invoked to determine whether a task count adjustment is
needed
* during a task rollover based on the recommendations from the task
auto-scaler.
- */
-
+ */
@VisibleForTesting
- void maybeScaleDuringTaskRollover()
+ void maybeScaleDuringTaskRollover() throws ExecutionException,
InterruptedException
{
- if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
- int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
- if (rolloverTaskCount > 0 && rolloverTaskCount !=
getIoConfig().getTaskCount()) {
- log.info("Autoscaler recommends scaling down to [%d] tasks during
rollover", rolloverTaskCount);
- changeTaskCountInIOConfig(rolloverTaskCount);
- // Here force reset the supervisor state to be re-calculated on the
next iteration of runInternal() call.
- // This seems the best way to inject task amount recalculation during
the rollover.
- clearPartitionAssignmentsForScaling();
-
- ServiceMetricEvent.Builder event = ServiceMetricEvent
- .builder()
- .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
- .setDimension(DruidMetrics.DATASOURCE, dataSource)
- .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
-
- emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
rolloverTaskCount));
+ if (taskAutoScaler == null) {
+ // Do nothing
+ } else {
+ log.info("Computing optimal taskCount for supervisor[%s] during
rollover.", supervisorId);
+ final int currentTaskCount = getCurrentTaskCount();
+ final int rolloverTaskCount =
taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount != currentTaskCount) {
+ log.info(
+ "Updating taskCount for supervisor[%s] from [%d] to [%d] during
rollover.",
+ supervisorId, currentTaskCount, rolloverTaskCount
+ );
+ isScalingTasksOnRollover.set(true);
+ new DynamicAllocationTasksNotice(
+ () -> rolloverTaskCount,
+ () -> isScalingTasksOnRollover.set(false),
+ emitter
+ ).handle();
}
}
}
@@ -4825,6 +4892,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ private ServiceMetricEvent.Builder getMetricBuilder()
+ {
+ return ServiceMetricEvent
+ .builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
+ .setDimensionIfNotNull(DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS));
+ }
+
protected void emitLag()
{
SupervisorStateManager.State basicState =
stateManager.getSupervisorState().getBasicState();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 9386ac365f8..633bd9b70dc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
@@ -88,10 +89,12 @@ public abstract class SeekableStreamSupervisorIOConfig
// Could be null
this.autoScalerConfig = autoScalerConfig;
this.autoScalerEnabled = autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler();
- // if autoscaler is enabled, then taskCount will be ignored here and
initial taskCount will equal to taskCountStart/taskCountMin
if (autoScalerEnabled) {
- final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
- this.taskCount = startTaskCount != null ? startTaskCount :
autoScalerConfig.getTaskCountMin();
+ // Priority: taskCountStart > taskCount > taskCountMin
+ this.taskCount = Configs.valueOrDefault(
+ autoScalerConfig.getTaskCountStart(),
+ Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
+ );
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9c42a6f35c9..20da7ab7901 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -29,6 +29,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -62,10 +63,17 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
public static final String LAG_COST_METRIC =
"task/autoScaler/costBased/lagCost";
public static final String IDLE_COST_METRIC =
"task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC =
"task/autoScaler/costBased/optimalTaskCount";
+ public static final String INVALID_METRICS_COUNT =
"task/autoScaler/costBased/invalidMetrics";
static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+ /**
+ * If average partition lag crosses this value and the processing rate is
+ * still zero, scaling actions are skipped and an alert is raised.
+ */
+ static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
+
/**
* Divisor for partition count in the K formula: K = (partitionCount /
K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
* This controls how aggressive the scaling is relative to partition count.
@@ -78,7 +86,6 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private final ServiceEmitter emitter;
private final SupervisorSpec spec;
private final CostBasedAutoScalerConfig config;
- private final ServiceMetricEvent.Builder metricBuilder;
private final ScheduledExecutorService autoscalerExecutor;
private final WeightedCostFunction costFunction;
private volatile CostMetrics lastKnownMetrics;
@@ -101,7 +108,11 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
this.costFunction = new WeightedCostFunction();
this.autoscalerExecutor =
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
+
StringUtils.encodeForFormat(spec.getId()));
- this.metricBuilder =
+ }
+
+ private ServiceMetricEvent.Builder getMetricBuilder()
+ {
+ return
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
.setDimension(
@@ -161,29 +172,26 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
public int computeTaskCountForScaleAction()
{
lastKnownMetrics = collectMetrics();
- if (lastKnownMetrics == null) {
- log.debug("Metrics not available for supervisorId [%s], skipping scaling
action", supervisorId);
- return -1;
- }
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
- final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
+ final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
// Perform scale-up actions; scale-down actions only if configured.
- int taskCount = -1;
+ final int taskCount;
if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
- log.info("New task count [%d] on supervisor [%s], scaling up",
taskCount, supervisorId);
+ log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).", supervisorId, currentTaskCount, taskCount);
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& isScaleActionAllowed()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
- log.info("New task count [%d] on supervisor [%s], scaling down",
taskCount, supervisorId);
+ log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
down).", supervisorId, currentTaskCount, taskCount);
} else {
- log.info("No scaling required for supervisor [%s]", supervisorId);
+ taskCount = -1;
+ log.debug("No scaling required for supervisor[%s]", supervisorId);
}
return taskCount;
}
@@ -206,8 +214,14 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
*/
int computeOptimalTaskCount(CostMetrics metrics)
{
- if (metrics == null) {
- log.debug("No metrics available yet for supervisorId [%s]",
supervisorId);
+ final Either<String, Boolean> result = validateMetricsForScaling(metrics);
+ if (result.isError()) {
+ log.debug("Valid metrics are not yet available for scaling
supervisor[%s]", supervisorId);
+ emitter.emit(
+ getMetricBuilder()
+ .setDimension(DruidMetrics.DESCRIPTION, result.error())
+ .setMetric(INVALID_METRICS_COUNT, 1L)
+ );
return -1;
}
@@ -228,55 +242,52 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
);
if (validTaskCounts.length == 0) {
- log.warn("No valid task counts after applying constraints for
supervisorId [%s]", supervisorId);
+ log.warn("No valid task counts after applying constraints for
supervisor[%s]", supervisorId);
return -1;
}
- int optimalTaskCount = -1;
- CostResult optimalCost = new CostResult();
+ // Start with the current task count as optimal
+ int optimalTaskCount = currentTaskCount;
+ CostResult optimalCost = costFunction.computeCost(metrics,
currentTaskCount, config);
log.info(
- "Current metrics: avgPartitionLag[%.1f], pollIdleRatio[%.1f],
lagWeight[%.1f], idleWeight[%.1f]",
- metrics.getAggregateLag(),
+ "Computing optimal taskCount for supervisor[%s] with metrics:"
+ + " currentTaskCount[%d], avgPartitionLag[%.1f],
avgProcessingRate[%.1f],"
+ + " pollIdleRatio[%.1f], lagWeight[%.1f], idleWeight[%.1f].",
+ supervisorId,
+ currentTaskCount,
+ metrics.getAvgPartitionLag(),
+ metrics.getAvgProcessingRate(),
metrics.getPollIdleRatio(),
config.getLagWeight(),
config.getIdleWeight()
);
- for (int taskCount : validTaskCounts) {
+ // Find the task count which reduces cost
+ final CostResult[] costResults = new CostResult[validTaskCounts.length];
+ for (int i = 0; i < validTaskCounts.length; ++i) {
+ final int taskCount = validTaskCounts[i];
CostResult costResult = costFunction.computeCost(metrics, taskCount,
config);
double cost = costResult.totalCost();
- log.info(
- "Proposed task count[%d] has total cost[%.4f] = lagCost[%.4f] +
idleCost[%.4f].",
- taskCount,
- cost,
- costResult.lagCost(),
- costResult.idleCost()
- );
+ costResults[i] = costResult;
if (cost < optimalCost.totalCost()) {
optimalTaskCount = taskCount;
optimalCost = costResult;
}
}
- emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long)
optimalTaskCount));
- emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC,
optimalCost.lagCost()));
- emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC,
optimalCost.idleCost()));
-
- log.debug(
- "Cost-based scaling evaluation for supervisorId [%s]: current=%d,
optimal=%d, cost=%.4f, "
- + "avgPartitionLag=%.2f, pollIdleRatio=%.3f",
- supervisorId,
- metrics.getCurrentTaskCount(),
- optimalTaskCount,
- optimalCost.totalCost(),
- metrics.getAvgPartitionLag(),
- metrics.getPollIdleRatio()
- );
+ emitter.emit(getMetricBuilder().setMetric(OPTIMAL_TASK_COUNT_METRIC,
(long) optimalTaskCount));
+ emitter.emit(getMetricBuilder().setMetric(LAG_COST_METRIC,
optimalCost.lagCost()));
+ emitter.emit(getMetricBuilder().setMetric(IDLE_COST_METRIC,
optimalCost.idleCost()));
if (optimalTaskCount == currentTaskCount) {
return -1;
+ } else {
+ log.info(
+ "Optimal taskCount[%d] for supervisor[%s] has lowest cost[%.4f] out
of the following candidates: %n%s",
+ optimalTaskCount, supervisorId, optimalCost.totalCost(),
constructCostTable(validTaskCounts, costResults)
+ );
}
// Scale up is performed eagerly.
@@ -474,9 +485,12 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
final LagStats lagStats = supervisor.computeLagStats();
+ final double avgPartitionLag;
if (lagStats == null) {
log.debug("Lag stats unavailable for supervisorId [%s], skipping
collection", supervisorId);
- return null;
+ avgPartitionLag = -1;
+ } else {
+ avgPartitionLag = lagStats.getAvgLag();
}
final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
@@ -484,12 +498,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
final double movingAvgRate = extractMovingAverage(taskStats);
- // If moving average is not available, we stop scaling effort.
- if (movingAvgRate < 0) {
- return null;
- }
final double pollIdleRatio = extractPollIdleRatio(taskStats);
- final double avgPartitionLag = lagStats.getAvgLag();
return new CostMetrics(
avgPartitionLag,
@@ -501,6 +510,56 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
);
}
+ private static String constructCostTable(int[] taskCounts, CostResult[]
results)
+ {
+ final StringBuilder table = new StringBuilder();
+ table.append("Task count ");
+ for (int taskCount : taskCounts) {
+ table.append(StringUtils.format("| %8d", taskCount));
+ }
+ table.append("\nLag cost ");
+ for (CostResult result : results) {
+ table.append(StringUtils.format("| %8.1f", result.lagCost()));
+ }
+ table.append("\nIdle cost ");
+ for (CostResult result : results) {
+ table.append(StringUtils.format("| %8.1f", result.idleCost()));
+ }
+ table.append("\nTotal cost ");
+ for (CostResult result : results) {
+ table.append(StringUtils.format("| %8.1f", result.totalCost()));
+ }
+
+ return table.toString();
+ }
+
+ /**
+ * Checks if the given metrics are valid for auto-scaling. If they are not
+ * valid, auto-scaling will be skipped until fresh metrics are available.
+ *
+ * @return Either an error or a success boolean.
+ */
+ private Either<String, Boolean> validateMetricsForScaling(CostMetrics
metrics)
+ {
+ if (metrics == null) {
+ return Either.error("No metrics collected");
+ } else if (metrics.getAvgProcessingRate() < 0 ||
metrics.getPollIdleRatio() < 0) {
+ return Either.error("Task metrics not available");
+ } else if (metrics.getCurrentTaskCount() <= 0 ||
metrics.getPartitionCount() <= 0) {
+ return Either.error("Supervisor metrics not available");
+ } else if (metrics.getAvgPartitionLag() < 0) {
+ return Either.error("Lag metrics not available");
+ } else if (metrics.getAvgProcessingRate() < 1 &&
metrics.getAvgPartitionLag() > MAX_IDLENESS_PARTITION_LAG) {
+ log.makeAlert(
+ "Supervisor[%s] has high partition lag[%.0f] but processing rate is
zero. Check if the tasks are stuck.",
+ supervisorId, metrics.getAvgPartitionLag()
+ ).emit();
+ return Either.error("Lag is high but processing rate is zero");
+ } else {
+ return Either.value(true);
+ }
+ }
+
/**
* Determines if a scale action is currently allowed based on the elapsed
time
* since the last scale action and the configured minimum scale-down delay.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
index 8204ff0b6e5..8938ad11b96 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostMetrics.java
@@ -34,7 +34,7 @@ public class CostMetrics
private final double pollIdleRatio;
private final long taskDurationSeconds;
private final double avgProcessingRate;
- private double aggregateLag;
+ private final double aggregateLag;
public CostMetrics(
double avgPartitionLag,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
index 42096dd61e1..1ad946ff517 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -25,16 +25,10 @@ package
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
*/
public class CostResult
{
-
private final double totalCost;
private final double lagCost;
private final double idleCost;
- public CostResult()
- {
- this(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY);
- }
-
/**
* @param totalCost the weighted sum of lagCost and idleCost
* @param lagCost the weighted cost representing expected time (seconds)
to recover current lag
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 0f5ea6083c7..8fe4f9d8d06 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.logger.Logger;
/**
@@ -35,6 +36,13 @@ public class WeightedCostFunction
*/
static final double LAG_AMPLIFICATION_MULTIPLIER = 0.05;
+ /**
+ * Minimum rate of processing for any task in records per second. This is
used
+ * as a placeholder if avg rate is not available to ensure that cost
computations
+ * do not return infinitely large lag recovery times.
+ */
+ static final double MIN_PROCESSING_RATE = 1_000;
+
/**
* Computes cost for a given task count using compute time metrics.
* <p>
@@ -62,14 +70,8 @@ public class WeightedCostFunction
final double avgProcessingRate = metrics.getAvgProcessingRate();
final double lagRecoveryTime;
- if (avgProcessingRate <= 0) {
- // Metrics are unavailable - favor maintaining the current task count.
- // We're conservative about scale up, but won't let an unlikey scale
down to happen.
- if (proposedTaskCount == metrics.getCurrentTaskCount()) {
- return new CostResult(0.01d, 0.0, 0.0);
- } else {
- return new CostResult(Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
- }
+ if (avgProcessingRate < 0) {
+ throw DruidException.defensive("Avg processing rate[%.2f] must not be
negative.", avgProcessingRate);
} else {
// Lag recovery time is decreasing by adding tasks and increasing by
ejecting tasks.
// In case of increasing lag, we apply an amplification factor to
reflect the urgency of addressing lag.
@@ -79,7 +81,8 @@ public class WeightedCostFunction
} else {
final double lagPerPartition = metrics.getAggregateLag() /
metrics.getPartitionCount();
final double amplification = Math.max(1.0, 1.0 +
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
- lagRecoveryTime = metrics.getAggregateLag() * amplification /
(proposedTaskCount * avgProcessingRate);
+ final double adjustedProcessingRate = Math.max(avgProcessingRate,
MIN_PROCESSING_RATE);
+ lagRecoveryTime = metrics.getAggregateLag() * amplification /
(proposedTaskCount * adjustedProcessingRate);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index a64ca5a6b6b..4377452ec5d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -203,7 +203,7 @@ public class SegmentTransactionalInsertActionTest
);
Assert.assertEquals(
- SegmentPublishResult.retryableFailure(
+ SegmentPublishResult.fail(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+ " ahead of the last committed end state[null]. Try resetting the
supervisor."
),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
index b39e6097903..0bf697a2a09 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -23,11 +23,14 @@ import com.google.common.base.Optional;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import org.apache.druid.query.DruidMetrics;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.List;
+
public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends
SeekableStreamSupervisorTestBase
{
private static final int DEFAULT_TASK_COUNT = 10;
@@ -45,7 +48,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
}
@Test
- public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
+ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
throws Exception
{
// Given
setupSpecExpectations(createIOConfig(DEFAULT_TASK_COUNT, null));
@@ -65,7 +68,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
}
@Test
- public void
test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale()
+ public void
test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale()
throws Exception
{
// Given
setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
@@ -93,7 +96,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
}
@Test
- public void
test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling()
+ public void
test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling()
throws Exception
{
// Given
final int targetTaskCount = 5;
@@ -102,6 +105,9 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
.andReturn(createMockAutoScaler(targetTaskCount))
.anyTimes();
+ EasyMock.expect(spec.getContextValue(EasyMock.eq(DruidMetrics.TAGS)))
+ .andReturn(List.of("tag1", "tag2"))
+ .anyTimes();
EasyMock.replay(spec);
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(100);
@@ -121,7 +127,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
}
@Test
- public void
test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale()
+ public void
test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() throws
Exception
{
// Given
setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index db37e3014e3..3f534de8068 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2760,7 +2760,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
"stream",
null,
1,
- 99,
+ null,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 45c1c2b772e..7b46c4c6558 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -32,6 +32,7 @@ import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -372,6 +373,10 @@ public class CostBasedAutoScalerMockTest
AVG_PROCESSING_RATE
);
doReturn(metrics).when(autoScaler).collectMetrics();
+
+ SeekableStreamSupervisorIOConfig ioConfig =
mock(SeekableStreamSupervisorIOConfig.class);
+ doReturn(ioConfig).when(mockSupervisor).getIoConfig();
+ doReturn(taskCount).when(ioConfig).getTaskCount();
}
private CostMetrics createMetrics(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 86504458b75..0eea53b16ed 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -484,6 +484,7 @@ public class CostBasedAutoScalerTest
when(spec.isSuspended()).thenReturn(false);
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("test-stream");
+ when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
// No task stats means the moving average rate is unavailable
when(supervisor.getStats()).thenReturn(Collections.emptyMap());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index c4133c0093b..f96df30049e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+import org.apache.druid.error.DruidException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -156,20 +157,26 @@ public class WeightedCostFunctionTest
}
@Test
- public void testNoProcessingRateFavorsCurrentTaskCount()
+ public void testZeroProcessingRateUsesDefaultRate()
{
- // When the processing rate is unavailable (0), the cost function should
favor
- // maintaining the current task count, rather to scale up decisions with
incomplete data.
+ // When the processing rate is zero, the cost function uses a default rate
and tries to recover lag
int currentTaskCount = 10;
- CostMetrics metricsNoRate = createMetricsWithRate(50000.0,
currentTaskCount, 100, 0.3, 0.0);
-
+ CostMetrics metricsNoRate = createMetricsWithRate(50000.0,
currentTaskCount, 100, 1.0, 0.0);
+
+ final CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig
+ .builder()
+ .taskCountMin(1)
+ .taskCountMax(100)
+ .idleWeight(0)
+ .lagWeight(1)
+ .build();
double costAtCurrent = costFunction.computeCost(metricsNoRate,
currentTaskCount, config).totalCost();
double costScaleUp = costFunction.computeCost(metricsNoRate,
currentTaskCount + 5, config).totalCost();
double costScaleDown = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, config).totalCost();
Assert.assertTrue(
"Cost at current should be less than cost for scale up",
- costAtCurrent < costScaleUp
+ costAtCurrent > costScaleUp
);
Assert.assertTrue(
"Cost at current should be less than cost for scale down",
@@ -178,29 +185,12 @@ public class WeightedCostFunctionTest
}
@Test
- public void testNoProcessingRateDeviationPenaltyIsSymmetric()
+ public void testNegativeProcessingRate_throwsDefensiveException()
{
- // Deviation penalty should be symmetric around current task count
- int currentTaskCount = 10;
- CostMetrics metricsNoRate = createMetricsWithRate(50000.0,
currentTaskCount, 100, 0.5, 0.0);
-
- // Use lag-only config to isolate the lag recovery time component
- CostBasedAutoScalerConfig lagOnlyConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.lagWeight(1.0)
-
.idleWeight(0.0)
-
.build();
-
- double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount
+ 5, lagOnlyConfig).totalCost();
- double costDown5 = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, lagOnlyConfig).totalCost();
-
- Assert.assertEquals(
- "Lag cost for +5 and -5 deviation should be equal",
- costUp5,
- costDown5,
- 0.001
+ final CostMetrics metrics = createMetricsWithRate(50000.0, 1, 100, 0.5,
-1);
+ Assert.assertThrows(
+ DruidException.class,
+ () -> costFunction.computeCost(metrics, 2, config)
);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index eb84b50399a..74e65f1228b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -33,8 +33,12 @@ import java.util.function.Function;
public abstract class TransactionalSegmentPublisher
{
- private static final int QUIET_RETRIES = 3;
- private static final int MAX_RETRIES = 5;
+ private static final int QUIET_RETRIES = 5;
+
+ /**
+ * Approximately 10 minutes of retrying using {@link
RetryUtils#nextRetrySleepMillis(int)}.
+ */
+ private static final int MAX_RETRIES = 13;
/**
* Publish segments, along with some commit metadata, in a single
transaction.
@@ -104,7 +108,7 @@ public abstract class TransactionalSegmentPublisher
/**
* Sleeps until the next attempt.
*/
- private static void awaitNextRetry(SegmentPublishResult lastResult, int
attemptCount)
+ void awaitNextRetry(SegmentPublishResult lastResult, int attemptCount)
{
try {
RetryUtils.awaitNextRetry(
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
index 884b475893d..c4eec7106a7 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
@@ -46,7 +46,7 @@ public class TransactionalSegmentPublisherTest
SegmentPublishResult.retryableFailure("this error is retryable"),
publisher.publishSegments(null, Set.of(), Function.identity(), null,
null)
);
- Assert.assertEquals(6, attemptCount.get());
+ Assert.assertEquals(14, attemptCount.get());
}
@Test
@@ -99,6 +99,12 @@ public class TransactionalSegmentPublisherTest
attemptCount.incrementAndGet();
return publishResult;
}
+
+ @Override
+ void awaitNextRetry(SegmentPublishResult lastResult, int attemptCount)
+ {
+ // return immediately
+ }
};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]