This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ff6fc385d1f Inject new task count calculation during the rollover
(#18860)
ff6fc385d1f is described below
commit ff6fc385d1f13d9cd5044d989c53775f5553c66c
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Mon Jan 5 15:22:24 2026 +0200
Inject new task count calculation during the rollover (#18860)
Changes:
- Scale up/down tasks during task rollover
- Add class `CostResult`
- Enhance cost calculation in new auto-scaler
---
.../CostBasedAutoScalerIntegrationTest.java | 74 ++-
.../overlord/supervisor/SupervisorManager.java | 2 +-
.../supervisor/SeekableStreamSupervisor.java | 48 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 186 ++++---
.../supervisor/autoscaler/CostResult.java | 64 +++
.../autoscaler/WeightedCostFunction.java | 25 +-
.../overlord/supervisor/SupervisorManagerTest.java | 45 +-
.../SeekableStreamSupervisorSpecTest.java | 333 ++++++++---
...treamSupervisorScaleDuringTaskRolloverTest.java | 612 +++++++++++++++++++++
.../SeekableStreamSupervisorStateTest.java | 22 +
.../autoscaler/CostBasedAutoScalerTest.java | 340 ++++++++++--
.../autoscaler/WeightedCostFunctionTest.java | 54 +-
.../indexing/overlord/supervisor/Supervisor.java | 6 +
.../autoscaler/SupervisorTaskAutoScaler.java | 11 +
14 files changed, 1554 insertions(+), 268 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index c5b86688ea1..140c4626d22 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -41,7 +41,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import org.joda.time.Seconds;
+import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -59,6 +59,7 @@ import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.Cos
* <p>
* Tests the autoscaler's ability to compute optimal task counts based on
partition count and cost metrics (lag and idle time).
*/
+@SuppressWarnings("resource")
public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
{
private static final String TOPIC =
EmbeddedClusterApis.createTestDatasourceName();
@@ -95,18 +96,11 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
}
};
- // Increase worker capacity to handle more tasks
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
- .addProperty("druid.worker.capacity", "60");
-
- overlord.addProperty("druid.indexer.task.default.context",
"{\"useConcurrentLocks\": true}")
- .addProperty("druid.manager.segments.useIncrementalCache",
"ifSynced")
- .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
-
- coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"ifSynced");
+ .addProperty("druid.worker.capacity", "100");
cluster.useLatchableEmitter()
- .useDefaultTimeoutForLatchableEmitter(120)
+ .useDefaultTimeoutForLatchableEmitter(60)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
@@ -162,7 +156,6 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
}
@Test
- @Timeout(125)
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
{
final String superId = dataSource + "_super_scaleup";
@@ -215,6 +208,63 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
+ @Test
+ void test_scaleDownDuringTaskRollover()
+ {
+ final String superId = dataSource + "_super";
+ final int initialTaskCount = 10;
+
+ final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
+ .builder()
+ .enableTaskAutoScaler(true)
+ .taskCountMin(1)
+ .taskCountMax(10)
+ .taskCountStart(initialTaskCount)
+ .scaleActionPeriodMillis(2000)
+ .minTriggerScaleActionFrequencyMillis(2000)
+ // High idle weight ensures scale-down when tasks are mostly idle
(little data to process)
+ .lagWeight(0.1)
+ .idleWeight(0.9)
+ .build();
+
+ final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
+
+ // Submit the supervisor
+ Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+
+ // Wait for at least one task running for the datasource managed by the
supervisor.
+ overlord.latchableEmitter().waitForEvent(e ->
e.hasMetricName("task/run/time")
+
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+ // Wait for autoscaler to emit metric indicating scale-down, it should be
just less than the current task count.
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+ .hasValueMatching(Matchers.lessThan((long)
initialTaskCount)));
+
+ // Wait for tasks to complete (first rollover)
+ overlord.latchableEmitter().waitForEvent(e ->
e.hasMetricName("task/action/success/count"));
+
+ // Wait for the task running for the datasource managed by a supervisor.
+ overlord.latchableEmitter().waitForEvent(e ->
e.hasMetricName("task/run/time")
+
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+ // After rollover, verify that the running task count has decreased
+ // The autoscaler should have recommended fewer tasks due to high idle time
+ final int postRolloverRunningTasks =
cluster.callApi().getTaskCount("running", dataSource);
+
+ Assertions.assertTrue(
+ postRolloverRunningTasks < initialTaskCount,
+ StringUtils.format(
+ "Expected running task count to decrease after rollover. Initial:
%d, After rollover: %d",
+ initialTaskCount,
+ postRolloverRunningTasks
+ )
+ );
+
+ // Suspend the supervisor to clean up
+ cluster.callApi().postSupervisor(spec.createSuspendedSpec());
+ }
+
private void produceRecordsToKafka(int recordCount, int iterations)
{
int recordCountPerSlice = recordCount / iterations;
@@ -258,7 +308,7 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
ioConfig -> ioConfig
.withConsumerProperties(kafkaServer.consumerProperties())
.withTaskCount(taskCount)
- .withTaskDuration(Seconds.THREE.toPeriod())
+ .withTaskDuration(Period.seconds(7))
.withAutoScalerConfig(autoScalerConfig)
)
.withId(supervisorId)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 21d2a626501..0cd799bed54 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -502,7 +502,7 @@ public class SupervisorManager
SupervisorTaskAutoScaler autoscaler;
try {
supervisor = spec.createSupervisor();
- autoscaler = spec.createAutoscaler(supervisor);
+ autoscaler = supervisor.createAutoscaler(spec);
supervisor.start();
if (autoscaler != null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 90adc68c799..c5d3ffe873d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -61,8 +61,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -885,7 +887,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* Tag for identifying this supervisor in thread-names, listeners, etc. tag
= (type + supervisorId).
- */
+ */
private final String supervisorTag;
private final TaskInfoProvider taskInfoProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
@@ -926,6 +928,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ /**
+ * Reference to the autoscaler, used for rollover-based scale-down decisions.
+ * Wired by {@link SupervisorManager} after supervisor creation.
+ */
+ private volatile SupervisorTaskAutoScaler taskAutoScaler;
+
// snapshots latest sequences from the stream to be verified in the next run
cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType>
previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
@@ -1306,7 +1314,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (log.isDebugEnabled()) {
log.debug(
"Handled notice[%s] from notices queue in [%d] ms, "
- + "current notices queue size [%d] for
supervisor[%s] for datasource[%s].",
+ + "current notices queue size [%d] for
supervisor[%s] for datasource[%s].",
noticeType, noticeHandleTime.millisElapsed(),
getNoticesQueueSize(), supervisorId, dataSource
);
}
@@ -1677,6 +1685,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return limitedParseErrors;
}
+ @Override
+ public SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
+ {
+ this.taskAutoScaler = spec.createAutoscaler(this);
+ return this.taskAutoScaler;
+ }
+
@VisibleForTesting
public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
@@ -3428,6 +3443,29 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// remove this task group from the list of current task groups now that
it has been handled
activelyReadingTaskGroups.remove(groupId);
}
+
+ maybeScaleDuringTaskRollover();
+ }
+
+ /**
+ * Scales up or down the number of tasks during a task rollover, if
applicable.
+ * <p>
+ * This method is invoked to determine whether a task count adjustment is
needed
+ * during a task rollover based on the recommendations from the task
auto-scaler.
+ */
+ @VisibleForTesting
+ void maybeScaleDuringTaskRollover()
+ {
+ if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0) {
+ log.info("Autoscaler recommends scaling down to [%d] tasks during
rollover", rolloverTaskCount);
+ changeTaskCountInIOConfig(rolloverTaskCount);
+ // Here force reset the supervisor state to be re-calculated on the
next iteration of runInternal() call.
+ // This seems the best way to inject task amount recalculation during
the rollover.
+ clearAllocationInfo();
+ }
+ }
}
private DateTime computeEarliestTaskStartTime(TaskGroup group)
@@ -4688,9 +4726,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Try emitting lag even with stale metrics provided that none of the
partitions has negative lag
final long staleMillis = sequenceLastUpdated == null
- ? 0
- : DateTimes.nowUtc().getMillis()
- - (tuningConfig.getOffsetFetchPeriod().getMillis() +
sequenceLastUpdated.getMillis());
+ ? 0
+ : DateTimes.nowUtc().getMillis()
+ -
(tuningConfig.getOffsetFetchPeriod().getMillis() +
sequenceLastUpdated.getMillis());
if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x
< 0)) {
// Log at most once every twenty supervisor runs to reduce noise in
the logs
if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9ce08f93660..14e3ca2cf5e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -33,10 +33,9 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -58,8 +57,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
- public static final String AVG_LAG_METRIC =
"task/autoScaler/costBased/avgLag";
- public static final String AVG_IDLE_METRIC =
"task/autoScaler/costBased/pollIdleAvg";
+ public static final String LAG_COST_METRIC =
"task/autoScaler/costBased/lagCost";
+ public static final String IDLE_COST_METRIC =
"task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC =
"task/autoScaler/costBased/optimalTaskCount";
private final String supervisorId;
@@ -70,6 +69,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private final ServiceMetricEvent.Builder metricBuilder;
private final ScheduledExecutorService autoscalerExecutor;
private final WeightedCostFunction costFunction;
+ private volatile CostMetrics lastKnownMetrics;
public CostBasedAutoScaler(
SeekableStreamSupervisor supervisor,
@@ -86,7 +86,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
this.costFunction = new WeightedCostFunction();
- this.autoscalerExecutor =
Execs.scheduledSingleThreaded("CostBasedAutoScaler-" +
StringUtils.encodeForFormat(spec.getId()));
+ this.autoscalerExecutor =
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
+ +
StringUtils.encodeForFormat(spec.getId()));
this.metricBuilder = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(
@@ -98,12 +99,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
@Override
public void start()
{
- Callable<Integer> scaleAction = () ->
computeOptimalTaskCount(this.collectMetrics());
- Runnable onSuccessfulScale = () -> {
- };
-
autoscalerExecutor.scheduleAtFixedRate(
- supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale,
emitter),
+
supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, ()
-> {}, emitter),
config.getScaleActionPeriodMillis(),
config.getScaleActionPeriodMillis(),
TimeUnit.MILLISECONDS
@@ -129,46 +126,25 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
// No-op.
}
- private CostMetrics collectMetrics()
+ @Override
+ public int computeTaskCountForRollover()
{
- if (spec.isSuspended()) {
- log.debug("Supervisor [%s] is suspended, skipping a metrics collection",
supervisorId);
- return null;
- }
-
- final LagStats lagStats = supervisor.computeLagStats();
- if (lagStats == null) {
- log.debug("Lag stats unavailable for supervisorId [%s], skipping
collection", supervisorId);
- return null;
- }
-
- final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
- final int partitionCount = supervisor.getPartitionCount();
-
- final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
- final double movingAvgRate = extractMovingAverage(taskStats,
DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
- final double pollIdleRatio = extractPollIdleRatio(taskStats);
+ return computeOptimalTaskCount(lastKnownMetrics);
+ }
- final double avgPartitionLag = lagStats.getAvgLag();
+ public int computeTaskCountForScaleAction()
+ {
+ lastKnownMetrics = collectMetrics();
+ final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
+ final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
- // Use an actual 15-minute moving average processing rate if available
- final double avgProcessingRate;
- if (movingAvgRate > 0) {
- avgProcessingRate = movingAvgRate;
- } else {
- // Fallback: estimate processing rate based on idle ratio
- final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
- avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
- }
+ // Perform only scale-up actions
+ return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1;
+ }
- return new CostMetrics(
- avgPartitionLag,
- currentTaskCount,
- partitionCount,
- pollIdleRatio,
- supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
- avgProcessingRate
- );
+ public CostBasedAutoScalerConfig getConfig()
+ {
+ return config;
}
/**
@@ -184,7 +160,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
*
* @return optimal task count for scale-up, or -1 if no scaling action needed
*/
- public int computeOptimalTaskCount(CostMetrics metrics)
+ int computeOptimalTaskCount(CostMetrics metrics)
{
if (metrics == null) {
log.debug("No metrics available yet for supervisorId [%s]",
supervisorId);
@@ -204,33 +180,28 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
return -1;
}
- // If idle is already in the ideal range [0.2, 0.6], optimal utilization
has been achieved.
- // No scaling is needed - maintain stability by staying at the current
task count.
- final double currentIdleRatio = metrics.getPollIdleRatio();
- if (currentIdleRatio >= 0 &&
WeightedCostFunction.isIdleInIdealRange(currentIdleRatio)) {
- log.debug(
- "Idle ratio [%.3f] is in ideal range for supervisorId [%s], no
scaling needed",
- currentIdleRatio,
- supervisorId
- );
- return -1;
- }
-
int optimalTaskCount = -1;
- double optimalCost = Double.POSITIVE_INFINITY;
+ CostResult optimalCost = new CostResult();
for (int taskCount : validTaskCounts) {
- double cost = costFunction.computeCost(metrics, taskCount, config);
- log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost);
- if (cost < optimalCost) {
+ CostResult costResult = costFunction.computeCost(metrics, taskCount,
config);
+ double cost = costResult.totalCost();
+ log.debug(
+ "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+ taskCount,
+ cost,
+ costResult.lagCost(),
+ costResult.idleCost()
+ );
+ if (cost < optimalCost.totalCost()) {
optimalTaskCount = taskCount;
- optimalCost = cost;
+ optimalCost = costResult;
}
}
- emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC,
metrics.getAvgPartitionLag()));
- emitter.emit(metricBuilder.setMetric(AVG_IDLE_METRIC,
metrics.getPollIdleRatio()));
emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long)
optimalTaskCount));
+ emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC,
optimalCost.lagCost()));
+ emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC,
optimalCost.idleCost()));
log.debug(
"Cost-based scaling evaluation for supervisorId [%s]: current=%d,
optimal=%d, cost=%.4f, "
@@ -238,7 +209,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
supervisorId,
metrics.getCurrentTaskCount(),
optimalTaskCount,
- optimalCost,
+ optimalCost.totalCost(),
metrics.getAvgPartitionLag(),
metrics.getPollIdleRatio()
);
@@ -246,8 +217,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
if (optimalTaskCount == currentTaskCount) {
return -1;
}
- // Perform both scale-up and scale-down proactively
- // Future versions may perform scale-down on task rollover only
+
+ // Scale up is performed eagerly.
return optimalTaskCount;
}
@@ -264,26 +235,22 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
return new int[]{};
}
- List<Integer> result = new ArrayList<>();
+ Set<Integer> result = new HashSet<>();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
// Minimum partitions per task correspond to the maximum number of tasks
(scale up) and vice versa.
final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
- final int maxPartitionsPerTask = Math.min(partitionCount,
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK);
+ final int maxPartitionsPerTask = Math.min(
+ partitionCount,
+ currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
+ );
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
- if (result.isEmpty() || result.get(result.size() - 1) != taskCount) {
- result.add(taskCount);
- }
+ result.add(taskCount);
}
return result.stream().mapToInt(Integer::intValue).toArray();
}
- public CostBasedAutoScalerConfig getConfig()
- {
- return config;
- }
-
/**
* Extracts the average poll-idle-ratio metric from task stats.
* This metric indicates how much time the consumer spends idle waiting for
data.
@@ -324,9 +291,9 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
*
* @param taskStats the stats map from supervisor.getStats()
* @return the average 15-minute processing rate across all tasks in
records/second,
- * or -1 if no valid metrics are available
+ * or -1 if no valid metrics are available
*/
- static double extractMovingAverage(Map<String, Map<String, Object>>
taskStats, String movingAveragePeriodKey)
+ static double extractMovingAverage(Map<String, Map<String, Object>>
taskStats)
{
if (taskStats == null || taskStats.isEmpty()) {
return -1;
@@ -341,9 +308,15 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
if (movingAveragesObj instanceof Map) {
Object buildSegmentsObj = ((Map<?, ?>)
movingAveragesObj).get(RowIngestionMeters.BUILD_SEGMENTS);
if (buildSegmentsObj instanceof Map) {
- Object fifteenMinObj = ((Map<?, ?>)
buildSegmentsObj).get(movingAveragePeriodKey);
- if (fifteenMinObj instanceof Map) {
- Object processedRate = ((Map<?, ?>)
fifteenMinObj).get(RowIngestionMeters.PROCESSED);
+ Object movingAvgObj = ((Map<?, ?>)
buildSegmentsObj).get(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME);
+ if (movingAvgObj == null) {
+ movingAvgObj = ((Map<?, ?>)
buildSegmentsObj).get(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME);
+ if (movingAvgObj == null) {
+ movingAvgObj = ((Map<?, ?>)
buildSegmentsObj).get(DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
+ }
+ }
+ if (movingAvgObj instanceof Map) {
+ Object processedRate = ((Map<?, ?>)
movingAvgObj).get(RowIngestionMeters.PROCESSED);
if (processedRate instanceof Number) {
sum += ((Number) processedRate).doubleValue();
count++;
@@ -357,4 +330,47 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
return count > 0 ? sum / count : -1;
}
+
+ private CostMetrics collectMetrics()
+ {
+ if (spec.isSuspended()) {
+ log.debug("Supervisor [%s] is suspended, skipping a metrics collection",
supervisorId);
+ return null;
+ }
+
+ final LagStats lagStats = supervisor.computeLagStats();
+ if (lagStats == null) {
+ log.debug("Lag stats unavailable for supervisorId [%s], skipping
collection", supervisorId);
+ return null;
+ }
+
+ final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+ final int partitionCount = supervisor.getPartitionCount();
+
+ final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
+ final double movingAvgRate = extractMovingAverage(taskStats);
+ final double pollIdleRatio = extractPollIdleRatio(taskStats);
+
+ final double avgPartitionLag = lagStats.getAvgLag();
+
+ // Use an actual 15-minute moving average processing rate if available
+ final double avgProcessingRate;
+ if (movingAvgRate > 0) {
+ avgProcessingRate = movingAvgRate;
+ } else {
+ // Fallback: estimate processing rate based on the idle ratio
+ final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
+ avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
+ }
+
+ return new CostMetrics(
+ avgPartitionLag,
+ currentTaskCount,
+ partitionCount,
+ pollIdleRatio,
+ supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
+ avgProcessingRate
+ );
+ }
+
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
new file mode 100644
index 00000000000..42096dd61e1
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+/**
+ * Holds the result of a cost computation from {@link
WeightedCostFunction#computeCost}.
+ * All costs are measured in seconds.
+ */
+public class CostResult
+{
+
+ private final double totalCost;
+ private final double lagCost;
+ private final double idleCost;
+
+ public CostResult()
+ {
+ this(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY);
+ }
+
+ /**
+ * @param totalCost the weighted sum of lagCost and idleCost
+ * @param lagCost the weighted cost representing expected time (seconds)
to recover current lag
+ * @param idleCost the weighted cost representing total compute time
(seconds) wasted being idle per task duration
+ */
+ public CostResult(double totalCost, double lagCost, double idleCost)
+ {
+ this.totalCost = totalCost;
+ this.lagCost = lagCost;
+ this.idleCost = idleCost;
+ }
+
+ public double totalCost()
+ {
+ return totalCost;
+ }
+
+ public double lagCost()
+ {
+ return lagCost;
+ }
+
+ public double idleCost()
+ {
+ return idleCost;
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 1af8233527a..0da733ef9e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -58,14 +58,15 @@ public class WeightedCostFunction
* </ul>
* <p>
* Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}.
- * This approach directly connects costs to operational metricsю
+ * This approach directly connects costs to operational metrics.
*
- * @return cost score in seconds, or {@link Double#POSITIVE_INFINITY} for
invalid inputs
+ * @return CostResult containing totalCost, lagCost, and idleCost,
+ * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
*/
- public double computeCost(CostMetrics metrics, int proposedTaskCount,
CostBasedAutoScalerConfig config)
+ public CostResult computeCost(CostMetrics metrics, int proposedTaskCount,
CostBasedAutoScalerConfig config)
{
if (metrics == null || config == null || proposedTaskCount <= 0 ||
metrics.getPartitionCount() <= 0) {
- return Double.POSITIVE_INFINITY;
+ return new CostResult(Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
}
final double avgProcessingRate = metrics.getAvgProcessingRate();
@@ -74,9 +75,9 @@ public class WeightedCostFunction
// Metrics are unavailable - favor maintaining the current task count.
// We're conservative about scale up, but won't let an unlikey scale
down to happen.
if (proposedTaskCount == metrics.getCurrentTaskCount()) {
- return 0.01d;
+ return new CostResult(0.01d, 0.0, 0.0);
} else {
- return Double.POSITIVE_INFINITY;
+ return new CostResult(Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
}
} else {
// Lag recovery time is decreasing by adding tasks and increasing by
ejecting tasks.
@@ -86,19 +87,21 @@ public class WeightedCostFunction
final double predictedIdleRatio = estimateIdleRatio(metrics,
proposedTaskCount);
final double idleCost = proposedTaskCount *
metrics.getTaskDurationSeconds() * predictedIdleRatio;
- final double cost = config.getLagWeight() * lagRecoveryTime +
config.getIdleWeight() * idleCost;
+ final double lagCost = config.getLagWeight() * lagRecoveryTime;
+ final double weightedIdleCost = config.getIdleWeight() * idleCost;
+ final double cost = lagCost + weightedIdleCost;
log.debug(
- "Cost for taskCount[%d]: lagRecoveryTime[%.2fs], idleCost[%.2fs], "
+ "Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], "
+ "predictedIdle[%.3f], finalCost[%.2fs]",
proposedTaskCount,
- lagRecoveryTime,
- idleCost,
+ lagCost,
+ weightedIdleCost,
predictedIdleRatio,
cost
);
- return cost;
+ return new CostResult(cost, lagCost, weightedIdleCost);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index b61c93af3c8..7794a798473 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -102,7 +102,9 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
+
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -115,6 +117,7 @@ public class SupervisorManagerTest extends EasyMockSupport
resetAll();
supervisor2.start();
+
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stop(true);
replayAll();
@@ -164,6 +167,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
metadataSupervisorManager.insert("id1", spec);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -225,6 +229,7 @@ public class SupervisorManagerTest extends EasyMockSupport
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
Assert.assertFalse(manager.shouldUpdateSupervisor(spec));
@@ -249,6 +254,7 @@ public class SupervisorManagerTest extends EasyMockSupport
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
exception.expect(DruidException.class);
replayAll();
manager.start();
@@ -332,6 +338,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(supervisor1.getStatus()).andReturn(report);
replayAll();
@@ -352,6 +359,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1));
replayAll();
@@ -373,6 +381,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
+
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
@@ -414,6 +423,8 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
+
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.start();
EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor
explosion"));
replayAll();
@@ -433,7 +444,9 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
metadataSupervisorManager.insert(EasyMock.eq("id1"),
EasyMock.capture(capturedInsert));
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stop(true);
+
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor2.start();
EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor failed
to start"));
replayAll();
@@ -461,6 +474,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stopAsync();
EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
replayAll();
@@ -479,6 +493,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class));
replayAll();
@@ -498,6 +513,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
+
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -533,6 +549,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.resetOffsets(datasourceMetadata);
replayAll();
@@ -558,7 +575,9 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
+
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -574,6 +593,7 @@ public class SupervisorManagerTest extends EasyMockSupport
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"),
EasyMock.capture(capturedInsert));
supervisor2.start();
+
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stop(true);
replayAll();
@@ -589,6 +609,7 @@ public class SupervisorManagerTest extends EasyMockSupport
metadataSupervisorManager.insert(EasyMock.eq("id1"),
EasyMock.capture(capturedInsert));
supervisor2.stop(true);
supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.suspendOrResumeSupervisor("id1", false);
@@ -631,29 +652,29 @@ public class SupervisorManagerTest extends EasyMockSupport
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
SeekableStreamSupervisorSpec suspendedSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor suspendedSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor suspendedSupervisor =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes();
EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes();
EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes();
- EasyMock.replay(suspendedSpec);
+ EasyMock.replay(suspendedSpec, suspendedSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
SeekableStreamSupervisorSpec activeSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor activeSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor activeSupervisor =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes();
EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes();
EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes();
- EasyMock.replay(activeSpec);
+ EasyMock.replay(activeSpec, activeSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor activeSupervisorWithConcurrentLocks =
EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor activeSupervisorWithConcurrentLocks =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
@@ -664,11 +685,11 @@ public class SupervisorManagerTest extends EasyMockSupport
.andReturn(null).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
.andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS,
true)).anyTimes();
- EasyMock.replay(activeSpecWithConcurrentLocks);
+ EasyMock.replay(activeSpecWithConcurrentLocks,
activeSupervisorWithConcurrentLocks);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
SeekableStreamSupervisorSpec activeAppendSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor activeAppendSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor activeAppendSupervisor =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes();
@@ -678,12 +699,12 @@ public class SupervisorManagerTest extends EasyMockSupport
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
- EasyMock.replay(activeAppendSpec);
+ EasyMock.replay(activeAppendSpec, activeAppendSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
// A supervisor with useConcurrentLocks set to false explicitly must not
use an append lock
SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor supervisorWithUseConcurrentLocksFalse =
EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor supervisorWithUseConcurrentLocksFalse =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
@@ -697,7 +718,7 @@ public class SupervisorManagerTest extends EasyMockSupport
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
- EasyMock.replay(specWithUseConcurrentLocksFalse);
+ EasyMock.replay(specWithUseConcurrentLocksFalse,
supervisorWithUseConcurrentLocksFalse);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
replayAll();
@@ -737,7 +758,7 @@ public class SupervisorManagerTest extends EasyMockSupport
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
SeekableStreamSupervisorSpec streamingSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- SeekableStreamSupervisor streamSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
+ SeekableStreamSupervisor streamSupervisor =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes();
@@ -746,7 +767,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes();
EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes();
- EasyMock.replay(streamingSpec);
+ EasyMock.replay(streamingSpec, streamSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
EasyMock.expectLastCall().once();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 58ffb8438e0..f0b15f8c8a0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -37,6 +37,7 @@ import
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -62,6 +63,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.TestSupervisorSpec;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -678,6 +680,103 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
}
+ @Test
+ public void
testAutoScalerReturnsNoopWhenSupervisorIsNotSeekableStreamSupervisor()
+ {
+ // Test the branch where supervisor instanceof SeekableStreamSupervisor is
false
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("taskCountMax", 8);
+ autoScalerConfig.put("taskCountMin", 1);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
+
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.replay(supervisor4);
+
+ TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+
+ // Create a non-SeekableStreamSupervisor mock
+ Supervisor nonSeekableStreamSupervisor = EasyMock.mock(Supervisor.class);
+ EasyMock.replay(nonSeekableStreamSupervisor);
+
+ // When passing a non-SeekableStreamSupervisor, should return
NoopTaskAutoScaler
+ SupervisorTaskAutoScaler autoscaler =
spec.createAutoscaler(nonSeekableStreamSupervisor);
+ Assert.assertTrue(
+ "Should return NoopTaskAutoScaler when supervisor is not
SeekableStreamSupervisor",
+ autoscaler instanceof NoopTaskAutoScaler
+ );
+ }
+
+ @Test
+ public void testAutoScalerReturnsNoopWhenConfigIsNull()
+ {
+ // Test the branch where autoScalerConfig is null
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(null)
+ .anyTimes();
+
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.replay(supervisor4);
+
+ TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+
+ // When autoScalerConfig is null, should return NoopTaskAutoScaler
+ SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(
+ "Should return NoopTaskAutoScaler when autoScalerConfig is null",
+ autoscaler instanceof NoopTaskAutoScaler
+ );
+ }
+
@Test
public void testDefaultAutoScalerConfigCreatedWithDefault()
{
@@ -857,71 +956,6 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
autoScaler.stop();
}
- @Test
- public void
test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing()
throws InterruptedException
- {
- EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
-
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
-
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
-
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
- EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
- EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
- EasyMock.replay(spec);
-
-
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
-
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
- EasyMock.replay(ingestionSchema);
-
-
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
-
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
- EasyMock.replay(taskMaster);
-
- StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
- TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
-
- LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
- supervisor,
- DATASOURCE,
- mapper.convertValue(
- getScaleOutProperties(2),
- LagBasedAutoScalerConfig.class
- ),
- spec,
- dynamicActionEmitter
- );
-
- supervisor.addTaskGroupToPendingCompletionTaskGroup(
- 0,
- ImmutableMap.of("0", "0"),
- null,
- null,
- Set.of("dummyTask"),
- Collections.emptySet()
- );
-
- supervisor.start();
- autoScaler.start();
-
- supervisor.runInternal();
- Thread.sleep(1000); // ensure a dynamic allocation notice completes
-
- Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
- Assert.assertTrue(
- dynamicActionEmitter
-
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
- .stream()
- .map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
- .filter(Objects::nonNull)
- .anyMatch("There are tasks pending completion"::equals)
- );
-
-
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
- autoScaler.reset();
- autoScaler.stop();
- }
-
@Test
public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor()
throws InterruptedException
{
@@ -1593,6 +1627,175 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
}
+ @Test
+ public void
test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing()
throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ dynamicActionEmitter
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ 0,
+ ImmutableMap.of("0", "0"),
+ null,
+ null,
+ Set.of("dummyTask"),
+ Collections.emptySet()
+ );
+
+ supervisor.start();
+ autoScaler.start();
+
+ supervisor.runInternal();
+ Thread.sleep(1000); // ensure a dynamic allocation notice completes
+
+ Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertTrue(
+ dynamicActionEmitter
+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+ .stream()
+ .map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+ .filter(Objects::nonNull)
+ .anyMatch("There are tasks pending completion"::equals)
+ );
+
+
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ @Test
+ public void test_dynamicAllocationNotice_skips_whenSupervisorSuspended()
throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ // Suspended → DynamicAllocationTasksNotice should return early and not
scale
+ EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(3);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ emitter
+ );
+
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+
+ int before = supervisor.getIoConfig().getTaskCount();
+ Thread.sleep(1000);
+ int after = supervisor.getIoConfig().getTaskCount();
+ // No scaling expected because supervisor is suspended
+ Assert.assertEquals(before, after);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ @Test
+ public void
test_changeTaskCountInIOConfig_handlesExceptionAndStillUpdatesTaskCount()
throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ // SupervisorManager present but metadata insert fails → should be handled
+ SupervisorManager sm = EasyMock.createMock(SupervisorManager.class);
+ MetadataSupervisorManager msm =
EasyMock.createMock(MetadataSupervisorManager.class);
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(sm)).anyTimes();
+
EasyMock.expect(sm.getMetadataSupervisorManager()).andReturn(msm).anyTimes();
+ msm.insert(EasyMock.anyString(), EasyMock.anyObject());
+ EasyMock.expectLastCall().andThrow(new
RuntimeException("boom")).anyTimes();
+ EasyMock.replay(taskMaster, sm, msm);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ emitter
+ );
+
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+
+ int before = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, before);
+ Thread.sleep(1000); // allow one dynamic allocation cycle
+ int after = supervisor.getIoConfig().getTaskCount();
+ // Even though metadata insert failed, taskCount should still be updated
in ioConfig
+ Assert.assertEquals(2, after);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
@Test
public void testMergeSpecConfigs()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
new file mode 100644
index 00000000000..3e4bcd92b99
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends
EasyMockSupport
+{
+ private static final ObjectMapper OBJECT_MAPPER =
TestHelper.makeJsonMapper();
+ private static final String STREAM = "stream";
+ private static final String DATASOURCE = "testDS";
+ private static final String SUPERVISOR = "supervisor";
+ private static final int DEFAULT_TASK_COUNT = 10;
+
+ private TaskStorage taskStorage;
+ private TaskMaster taskMaster;
+ private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+ private ServiceEmitter emitter;
+ private RowIngestionMetersFactory rowIngestionMetersFactory;
+ private SeekableStreamIndexTaskClientFactory taskClientFactory;
+ private SeekableStreamSupervisorSpec spec;
+ private SupervisorStateManagerConfig supervisorConfig;
+
+ @Before
+ public void setUp()
+ {
+ taskStorage = EasyMock.mock(TaskStorage.class);
+ taskMaster = EasyMock.mock(TaskMaster.class);
+ indexerMetadataStorageCoordinator =
EasyMock.mock(IndexerMetadataStorageCoordinator.class);
+ emitter = new StubServiceEmitter();
+ rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
+ taskClientFactory =
EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+ spec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ supervisorConfig = new SupervisorStateManagerConfig();
+
+ // Common taskMaster setup - used by all tests
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+ }
+
+ @Test
+ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
+ {
+ // Given
+ setupSpecExpectations(getIOConfigWithoutAutoScaler(5));
+
EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
+ supervisor.start();
+
+ int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should not change when taskAutoScaler is null",
+ beforeTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ @Test
+ public void
test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale()
+ {
+ // Given
+ setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+ .andReturn(createMockAutoScaler(-1))
+ .anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(100);
+ supervisor.start();
+ supervisor.createAutoscaler(spec);
+
+ int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should not change when rolloverTaskCount <= 0",
+ beforeTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ @Test
+ public void
test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling()
+ {
+ // Given
+ final int targetTaskCount = 5;
+
+ setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+ .andReturn(createMockAutoScaler(targetTaskCount))
+ .anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(100);
+ supervisor.start();
+ supervisor.createAutoscaler(spec);
+
+ Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should be updated to " + targetTaskCount + " when
rolloverTaskCount > 0",
+ targetTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ @Test
+ public void
test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale()
+ {
+ // Given
+ setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+ .andReturn(createMockAutoScaler(0))
+ .anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(100);
+ supervisor.start();
+ supervisor.createAutoscaler(spec);
+
+ int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should not change when rolloverTaskCount is 0",
+ beforeTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ // Helper methods for test setup
+
+ /**
+ * Sets up common spec expectations. Call EasyMock.replay(spec) after this
and any additional expectations.
+ */
+ private void setupSpecExpectations(SeekableStreamSupervisorIOConfig ioConfig)
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ }
+
+ /**
+ * Creates a mock autoscaler that returns the specified rollover count.
+ */
+ private static SupervisorTaskAutoScaler createMockAutoScaler(int
rolloverCount)
+ {
+ return new SupervisorTaskAutoScaler()
+ {
+ @Override
+ public void start()
+ {
+ }
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public void reset()
+ {
+ }
+
+ @Override
+ public int computeTaskCountForRollover()
+ {
+ return rolloverCount;
+ }
+ };
+ }
+
+ // Helper methods for config creation
+
+ private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig()
+ {
+ return CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.3)
+ .idleWeight(0.7)
+ .scaleActionPeriodMillis(100)
+ .build();
+ }
+
+ private SeekableStreamSupervisorIOConfig getIOConfigWithCostBasedAutoScaler()
+ {
+ return createIOConfig(DEFAULT_TASK_COUNT, getCostBasedAutoScalerConfig());
+ }
+
+ private SeekableStreamSupervisorIOConfig getIOConfigWithoutAutoScaler(int
taskCount)
+ {
+ return createIOConfig(taskCount, null);
+ }
+
+ private SeekableStreamSupervisorIOConfig createIOConfig(int taskCount,
CostBasedAutoScalerConfig autoScalerConfig)
+ {
+ return new SeekableStreamSupervisorIOConfig(
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ autoScalerConfig,
+ LagAggregator.DEFAULT,
+ null,
+ null,
+ null
+ )
+ {
+ };
+ }
+
+ private static DataSchema getDataSchema()
+ {
+ List<DimensionSchema> dimensions = new ArrayList<>();
+ dimensions.add(StringDimensionSchema.create("dim1"));
+ dimensions.add(StringDimensionSchema.create("dim2"));
+
+ return DataSchema.builder()
+ .withDataSource(DATASOURCE)
+ .withTimestamp(new TimestampSpec("timestamp", "iso",
null))
+ .withDimensions(dimensions)
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ ImmutableList.of()
+ )
+ )
+ .build();
+ }
+
+ private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+ {
+ return new SeekableStreamSupervisorTuningConfig()
+ {
+ @Override
+ public Integer getWorkerThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Long getChatRetries()
+ {
+ return 1L;
+ }
+
+ @Override
+ public Duration getHttpTimeout()
+ {
+ return new Period("PT1M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getShutdownTimeout()
+ {
+ return new Period("PT1S").toStandardDuration();
+ }
+
+ @Override
+ public Duration getRepartitionTransitionDuration()
+ {
+ return new Period("PT2M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getOffsetFetchPeriod()
+ {
+ return new Period("PT5M").toStandardDuration();
+ }
+
+ @Override
+ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+ {
+ return new SeekableStreamIndexTaskTuningConfig(null, null, null, null,
null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null, null, null,
+ null, null, null
+ )
+ {
+ @Override
+ public SeekableStreamIndexTaskTuningConfig
withBasePersistDirectory(File dir)
+ {
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return null;
+ }
+ };
+ }
+ };
+ }
+
+ // Inner test classes
+
+ private abstract class BaseTestSeekableStreamSupervisor extends
SeekableStreamSupervisor<String, String, ByteEntity>
+ {
+ private BaseTestSeekableStreamSupervisor()
+ {
+ super(
+ "testSupervisorId",
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ spec,
+ rowIngestionMetersFactory,
+ false
+ );
+ }
+
+ @Override
+ protected String baseTaskName()
+ {
+ return "test";
+ }
+
+ @Override
+ protected void updatePartitionLagFromStream()
+ {
+ }
+
+ @Nullable
+ @Override
+ protected Map<String, Long> getPartitionRecordLag()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected Map<String, Long> getPartitionTimeLag()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
+ int groupId,
+ Map<String, String> startPartitions,
+ Map<String, String> endPartitions,
+ String baseSequenceName,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ Set<String> exclusiveStartSequenceNumberPartitions,
+ SeekableStreamSupervisorIOConfig ioConfig
+ )
+ {
+ return new SeekableStreamIndexTaskIOConfig<>(
+ groupId,
+ baseSequenceName,
+ new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions,
exclusiveStartSequenceNumberPartitions),
+ new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+ true,
+ minimumMessageTime,
+ maximumMessageTime,
+ ioConfig.getInputFormat(),
+ ioConfig.getTaskDuration().getStandardMinutes()
+ )
+ {
+ };
+ }
+
+ @Override
+ protected List<SeekableStreamIndexTask<String, String, ByteEntity>>
createIndexTasks(
+ int replicas,
+ String baseSequenceName,
+ ObjectMapper sortingMapper,
+ TreeMap<Integer, Map<String, String>> sequenceOffsets,
+ SeekableStreamIndexTaskIOConfig taskIoConfig,
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected int getTaskGroupIdForPartition(String partition)
+ {
+ return 0;
+ }
+
+ @Override
+ protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean doesTaskMatchSupervisor(Task task)
+ {
+ return true;
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
+ String stream,
+ Map<String, String> map
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected OrderedSequenceNumber<String> makeSequenceNumber(String seq,
boolean isExclusive)
+ {
+ return new OrderedSequenceNumber<>(seq, isExclusive)
+ {
+ @Override
+ public int compareTo(OrderedSequenceNumber<String> o)
+ {
+ return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
+ }
+ };
+ }
+
+ @Override
+ protected Map<String, Long> getRecordLagPerPartition(Map<String, String>
currentOffsets)
+ {
+ return null;
+ }
+
+ @Override
+ protected Map<String, Long> getTimeLagPerPartition(Map<String, String>
currentOffsets)
+ {
+ return null;
+ }
+
+ @Override
+ protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier()
+ {
+ return recordSupplier;
+ }
+
+ @Override
+ protected SeekableStreamSupervisorReportPayload<String, String>
createReportPayload(
+ int numPartitions,
+ boolean includeOffsets
+ )
+ {
+ return new SeekableStreamSupervisorReportPayload<>(SUPERVISOR,
DATASOURCE, STREAM, 1, 1, 1L,
+ null, null, null, null, null, null,
+ false, true, null, null, null
+ )
+ {
+ };
+ }
+
+ @Override
+ protected String getNotSetMarker()
+ {
+ return "NOT_SET";
+ }
+
+ @Override
+ protected String getEndOfPartitionMarker()
+ {
+ return "EOF";
+ }
+
+ @Override
+ protected boolean isEndOfShard(String seqNum)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean isShardExpirationMarker(String seqNum)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
+ {
+ return false;
+ }
+ }
+
+ private class TestSeekableStreamSupervisor extends
BaseTestSeekableStreamSupervisor
+ {
+ private final int partitionNumbers;
+
+ public TestSeekableStreamSupervisor(int partitionNumbers)
+ {
+ this.partitionNumbers = partitionNumbers;
+ }
+
+ @Override
+ protected void scheduleReporting(ScheduledExecutorService reportingExec)
+ {
+ }
+
+ @Override
+ public LagStats computeLagStats()
+ {
+ return new LagStats(0, 0, 0);
+ }
+
+ @Override
+ public int getPartitionCount()
+ {
+ return partitionNumbers;
+ }
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 5df9edd184d..71e798e7e2f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2751,6 +2751,28 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
Assert.assertEquals(12, config.getMaxAllowedStops());
}
+ @Test
+ public void testCreateAutoscalerStoresAndReturnsAutoscaler()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+
+ replayAll();
+
+ SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+ // Test that createAutoscaler returns null when spec returns null
+ SeekableStreamSupervisorSpec mockSpec =
EasyMock.createMock(SeekableStreamSupervisorSpec.class);
+
EasyMock.expect(mockSpec.createAutoscaler(supervisor)).andReturn(null).once();
+ EasyMock.replay(mockSpec);
+
+ Assert.assertNull(supervisor.createAutoscaler(mockSpec));
+ EasyMock.verify(mockSpec);
+
+ verifyAll();
+ }
+
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 44bbef9c6a1..caf5453f521 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
-import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@@ -35,6 +34,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
+import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
+import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
import static org.mockito.Mockito.when;
public class CostBasedAutoScalerTest
@@ -69,12 +71,23 @@ public class CostBasedAutoScalerTest
{
// For 100 partitions at 25 tasks (4 partitions/task), valid counts
include 25 and 34
int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100,
25);
+ Assert.assertTrue("Should contain the current task count",
contains(validTaskCounts, 25));
+ Assert.assertTrue("Should contain the next scale-up option",
contains(validTaskCounts, 34));
- Assert.assertTrue("Should contain current task count",
contains(validTaskCounts, 25));
- Assert.assertTrue("Should contain next scale-up option",
contains(validTaskCounts, 34));
+ // Edge cases
+ Assert.assertEquals("Zero partitions return empty array", 0,
CostBasedAutoScaler.computeValidTaskCounts(0, 10).length);
+ Assert.assertEquals("Negative partitions return empty array", 0,
CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length);
- // Edge case: zero partitions returns empty array
- Assert.assertEquals(0, CostBasedAutoScaler.computeValidTaskCounts(0,
10).length);
+ // Single partition
+ int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1);
+ Assert.assertTrue("Single partition should have at least one valid count",
singlePartition.length > 0);
+ Assert.assertTrue("Single partition should contain 1",
contains(singlePartition, 1));
+
+ // Current exceeds partitions - should still yield valid, deduplicated
options
+ int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5);
+ Assert.assertEquals(2, exceedsPartitions.length);
+ Assert.assertTrue(contains(exceedsPartitions, 1));
+ Assert.assertTrue(contains(exceedsPartitions, 2));
}
@Test
@@ -82,42 +95,24 @@ public class CostBasedAutoScalerTest
{
Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(null));
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(0.0, 10, 0, 0.0)));
- }
-
- @Test
- public void testComputeOptimalTaskCountIdleInIdealRange()
- {
- // When idle is in ideal range [0.2, 0.6], no scaling should occur
- Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(5000.0, 25, 100, 0.4)));
+ Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
+ Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
}
@Test
public void testComputeOptimalTaskCountScaling()
{
// High idle (underutilized) - should scale down
- // With high idle (0.8), the algorithm evaluates lower task counts and
finds they have lower idle cost
int scaleDownResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult <
25);
- }
- @Test
- public void
testComputeOptimalTaskCountLowIdleDoesNotScaleUpWithBalancedWeights()
- {
- // With corrected idle ratio model and marginal lag model, low idle does
not
- // automatically trigger scale-up. The algorithm is conservative because:
- // 1. Scale-up increases idle cost (more tasks = more idle per task with
fixed load)
- // 2. Marginal lag model means only ADDITIONAL tasks work on backlog
- //
- // This is intentional: the idle-heavy weights (0.4 idle) make the
algorithm
- // favor stability over aggressive scaling
- int result = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25,
100, 0.1));
-
- // Algorithm evaluates costs and may find current count optimal
- // or may scale down if idle cost reduction outweighs lag increase
- Assert.assertTrue(
- "With low idle and balanced weights, algorithm should not scale up
aggressively",
- result == -1 || result <= 25
- );
+ // Very high idle with high task count - should scale down
+ int highIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
+ Assert.assertTrue("Scale down scenario should return optimal <= current",
highIdleResult <= 50);
+
+ // With low idle and balanced weights, algorithm should not scale up
aggressively
+ int lowIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
+ Assert.assertTrue("With low idle and balanced weights, should not scale up
aggressively", lowIdleResult <= 25);
}
@Test
@@ -142,35 +137,219 @@ public class CostBasedAutoScalerTest
}
@Test
- public void testExtractProcessingRateMovingAverage()
+ public void testExtractPollIdleRatioInvalidTypes()
+ {
+ // Non-map task metric
+ Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
+ nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
+ Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
+
+ // Empty autoscaler metrics
+ Map<String, Map<String, Object>> emptyAutoscaler = new HashMap<>();
+ Map<String, Object> taskStats1 = new HashMap<>();
+ taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new
HashMap<>());
+ emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
+ Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
+
+ // Non-map autoscaler metrics
+ Map<String, Map<String, Object>> nonMapAutoscaler = new HashMap<>();
+ Map<String, Object> taskStats2 = new HashMap<>();
+ taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY,
"not-a-map");
+ nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
+ Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
+
+ // Non-number poll idle ratio
+ Map<String, Map<String, Object>> nonNumberRatio = new HashMap<>();
+ Map<String, Object> taskStats3 = new HashMap<>();
+ Map<String, Object> autoscalerMetrics = new HashMap<>();
+ autoscalerMetrics.put(SeekableStreamIndexTaskRunner.POLL_IDLE_RATIO_KEY,
"not-a-number");
+ taskStats3.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY,
autoscalerMetrics);
+ nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
+ Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio), 0.0001);
+ }
+
+ @Test
+ public void testExtractMovingAverage()
{
// Null and empty return -1
- Assert.assertEquals(
- -1.,
- CostBasedAutoScaler.extractMovingAverage(null,
DropwizardRowIngestionMeters.FIVE_MINUTE_NAME),
- 0.0001
- );
- Assert.assertEquals(
- -1.,
- CostBasedAutoScaler.extractMovingAverage(
- Collections.emptyMap(),
- DropwizardRowIngestionMeters.FIVE_MINUTE_NAME
- ),
- 0.0001
- );
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(null),
0.0001);
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()), 0.0001);
// Missing metrics return -1
Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new
HashMap<>()));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(missingMetrics,
DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001);
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(missingMetrics), 0.0001);
- // Valid stats return average
+ // Valid stats return average (using 5-minute)
Map<String, Map<String, Object>> validStats = new HashMap<>();
Map<String, Object> group = new HashMap<>();
group.put("task-0", buildTaskStatsWithMovingAverage(1000.0));
group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
validStats.put("0", group);
- Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(validStats,
DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001);
+ Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
+ }
+
+ @Test
+ public void testExtractMovingAverageIntervalFallback()
+ {
+ // 15-minute average is preferred
+ Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
+ fifteenMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
+ Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
+
+ // 1-minute as final fallback
+ Map<String, Map<String, Object>> oneMin = new HashMap<>();
+ oneMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
+ Assert.assertEquals(500.0,
CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
+
+ // 15-minute preferred over 5-minute when both available
+ Map<String, Map<String, Object>> allIntervals = new HashMap<>();
+ allIntervals.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0)));
+ Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(allIntervals), 0.0001);
+
+ // Falls back to 5-minute when 15-minute is null
+ Map<String, Map<String, Object>> nullFifteen = new HashMap<>();
+ nullFifteen.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, FIVE_MINUTE_NAME, 750.0)));
+ Assert.assertEquals(750.0,
CostBasedAutoScaler.extractMovingAverage(nullFifteen), 0.0001);
+
+ // Falls back to 1-minute when both 15 and 5 are null
+ Map<String, Map<String, Object>> bothNull = new HashMap<>();
+ bothNull.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithTwoNullIntervals(250.0)));
+ Assert.assertEquals(250.0,
CostBasedAutoScaler.extractMovingAverage(bothNull), 0.0001);
+ }
+
+ @Test
+ public void testExtractMovingAverageInvalidTypes()
+ {
+ // Non-map task metric
+ Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
+ nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
+
+ // Missing buildSegments
+ Map<String, Map<String, Object>> missingBuild = new HashMap<>();
+ Map<String, Object> taskStats1 = new HashMap<>();
+ taskStats1.put("movingAverages", new HashMap<>());
+ missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
+
+ // Non-map movingAverages
+ Map<String, Map<String, Object>> nonMapMA = new HashMap<>();
+ Map<String, Object> taskStats2 = new HashMap<>();
+ taskStats2.put("movingAverages", "not-a-map");
+ nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
+
+ // Non-map buildSegments
+ Map<String, Map<String, Object>> nonMapBS = new HashMap<>();
+ Map<String, Object> taskStats3 = new HashMap<>();
+ Map<String, Object> movingAverages3 = new HashMap<>();
+ movingAverages3.put(RowIngestionMeters.BUILD_SEGMENTS, "not-a-map");
+ taskStats3.put("movingAverages", movingAverages3);
+ nonMapBS.put("0", Collections.singletonMap("task-0", taskStats3));
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapBS), 0.0001);
+
+ // Non-map interval data
+ Map<String, Map<String, Object>> nonMapInterval = new HashMap<>();
+ Map<String, Object> taskStats4 = new HashMap<>();
+ Map<String, Object> movingAverages4 = new HashMap<>();
+ Map<String, Object> buildSegments4 = new HashMap<>();
+ buildSegments4.put(FIFTEEN_MINUTE_NAME, "not-a-map");
+ movingAverages4.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments4);
+ taskStats4.put("movingAverages", movingAverages4);
+ nonMapInterval.put("0", Collections.singletonMap("task-0", taskStats4));
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapInterval), 0.0001);
+
+ // Non-number processed rate
+ Map<String, Map<String, Object>> nonNumberRate = new HashMap<>();
+ Map<String, Object> taskStats5 = new HashMap<>();
+ Map<String, Object> movingAverages5 = new HashMap<>();
+ Map<String, Object> buildSegments5 = new HashMap<>();
+ Map<String, Object> fifteenMin = new HashMap<>();
+ fifteenMin.put(RowIngestionMeters.PROCESSED, "not-a-number");
+ buildSegments5.put(FIFTEEN_MINUTE_NAME, fifteenMin);
+ movingAverages5.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments5);
+ taskStats5.put("movingAverages", movingAverages5);
+ nonNumberRate.put("0", Collections.singletonMap("task-0", taskStats5));
+ Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonNumberRate), 0.0001);
+ }
+
+ @Test
+ public void testComputeTaskCountForRolloverReturnsMinusOneWhenSuspended()
+ {
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("s-up");
+ when(spec.isSuspended()).thenReturn(true);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("stream");
+
+ CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(10)
+ .taskCountMin(1)
+
.enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
+ .build();
+
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg,
spec, emitter);
+ Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+ }
+
+ @Test
+ public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull()
+ {
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("s-up");
+ when(spec.isSuspended()).thenReturn(false);
+ when(supervisor.computeLagStats()).thenReturn(null);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("stream");
+
+ CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(10)
+ .taskCountMin(1)
+
.enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
+ .build();
+
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg,
spec, emitter);
+ Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+ }
+
+ @Test
+ public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
+ {
+ // Tests the case where lastKnownMetrics is null (no
computeTaskCountForScaleAction called)
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("s-up");
+ when(spec.isSuspended()).thenReturn(false);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("stream");
+
+ CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(10)
+ .taskCountMin(1)
+
.enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
+ .build();
+
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg,
spec, emitter);
+ // Should return -1 when lastKnownMetrics is null
+ Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
}
private CostMetrics createMetrics(
@@ -213,7 +392,68 @@ public class CostBasedAutoScalerTest
private Map<String, Object> buildTaskStatsWithMovingAverage(double
processedRate)
{
Map<String, Object> buildSegments = new HashMap<>();
- buildSegments.put(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME,
Map.of(RowIngestionMeters.PROCESSED, processedRate));
+ buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED,
processedRate));
+
+ Map<String, Object> movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map<String, Object> taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map<String, Object>
buildTaskStatsWithMovingAverageForInterval(String intervalName, double
processedRate)
+ {
+ Map<String, Object> buildSegments = new HashMap<>();
+ buildSegments.put(intervalName, Map.of(RowIngestionMeters.PROCESSED,
processedRate));
+
+ Map<String, Object> movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map<String, Object> taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map<String, Object> buildTaskStatsWithMultipleMovingAverages(
+ double fifteenMinRate,
+ double fiveMinRate,
+ double oneMinRate
+ )
+ {
+ Map<String, Object> buildSegments = new HashMap<>();
+ buildSegments.put(FIFTEEN_MINUTE_NAME,
Map.of(RowIngestionMeters.PROCESSED, fifteenMinRate));
+ buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED,
fiveMinRate));
+ buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED,
oneMinRate));
+
+ Map<String, Object> movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map<String, Object> taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map<String, Object> buildTaskStatsWithNullInterval(String
nullInterval, String validInterval, double processedRate)
+ {
+ Map<String, Object> buildSegments = new HashMap<>();
+ buildSegments.put(nullInterval, null);
+ buildSegments.put(validInterval, Map.of(RowIngestionMeters.PROCESSED,
processedRate));
+
+ Map<String, Object> movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map<String, Object> taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map<String, Object> buildTaskStatsWithTwoNullIntervals(double
oneMinRate)
+ {
+ Map<String, Object> buildSegments = new HashMap<>();
+ buildSegments.put(FIFTEEN_MINUTE_NAME, null);
+ buildSegments.put(FIVE_MINUTE_NAME, null);
+ buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED,
oneMinRate));
Map<String, Object> movingAverages = new HashMap<>();
movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 0c5602052d4..90b50477c92 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -47,11 +47,11 @@ public class WeightedCostFunctionTest
{
CostMetrics validMetrics = createMetrics(100000.0, 10, 100, 0.3);
- Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(null, 10, config), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, 10, null), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, 0, config), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, -5, config), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(null, 10, config).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10,
config).totalCost(), 0.0);
}
@Test
@@ -68,8 +68,8 @@ public class WeightedCostFunctionTest
CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3);
- double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig);
- double costScaleDown = costFunction.computeCost(metrics, 5, lagOnlyConfig);
+ double costCurrent = costFunction.computeCost(metrics, 10,
lagOnlyConfig).totalCost();
+ double costScaleDown = costFunction.computeCost(metrics, 5,
lagOnlyConfig).totalCost();
// Scale down uses absolute model: lag / (5 * rate) = higher recovery time
// Current uses absolute model: lag / (10 * rate) = lower recovery time
@@ -97,15 +97,15 @@ public class WeightedCostFunctionTest
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
// Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
- double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig);
+ double costCurrent = costFunction.computeCost(metrics, 10,
lagOnlyConfig).totalCost();
Assert.assertEquals("Cost at current tasks", 1000., costCurrent, 0.1);
// Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
- double costUp5 = costFunction.computeCost(metrics, 15, lagOnlyConfig);
+ double costUp5 = costFunction.computeCost(metrics, 15,
lagOnlyConfig).totalCost();
Assert.assertEquals("Cost when scaling up by 5", 666.7, costUp5, 0.1);
// Scale up by 10 (to 20): marginal model = 10M / (20 * 1000) = 500s
- double costUp10 = costFunction.computeCost(metrics, 20, lagOnlyConfig);
+ double costUp10 = costFunction.computeCost(metrics, 20,
lagOnlyConfig).totalCost();
Assert.assertEquals("Cost when scaling up by 10", 500.0, costUp10, 0.01);
// Adding more tasks reduces lag recovery time
@@ -120,8 +120,8 @@ public class WeightedCostFunctionTest
// This is intentional behavior: the algorithm is conservative about
scale-up.
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
- double costCurrent = costFunction.computeCost(metrics, 10, config);
- double costScaleUp = costFunction.computeCost(metrics, 20, config);
+ double costCurrent = costFunction.computeCost(metrics, 10,
config).totalCost();
+ double costScaleUp = costFunction.computeCost(metrics, 20,
config).totalCost();
// With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from
// scaling up dominates the lag recovery benefit
@@ -154,8 +154,8 @@ public class WeightedCostFunctionTest
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.1);
- double costLag = costFunction.computeCost(metrics, 10, lagOnly);
- double costIdle = costFunction.computeCost(metrics, 10, idleOnly);
+ double costLag = costFunction.computeCost(metrics, 10,
lagOnly).totalCost();
+ double costIdle = costFunction.computeCost(metrics, 10,
idleOnly).totalCost();
Assert.assertNotEquals("Different weights should produce different costs",
costLag, costIdle, 0.0001);
Assert.assertTrue("Lag-only cost should be positive", costLag > 0.0);
@@ -170,9 +170,9 @@ public class WeightedCostFunctionTest
int currentTaskCount = 10;
CostMetrics metricsNoRate = createMetricsWithRate(50000.0,
currentTaskCount, 100, 0.3, 0.0);
- double costAtCurrent = costFunction.computeCost(metricsNoRate,
currentTaskCount, config);
- double costScaleUp = costFunction.computeCost(metricsNoRate,
currentTaskCount + 5, config);
- double costScaleDown = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, config);
+ double costAtCurrent = costFunction.computeCost(metricsNoRate,
currentTaskCount, config).totalCost();
+ double costScaleUp = costFunction.computeCost(metricsNoRate,
currentTaskCount + 5, config).totalCost();
+ double costScaleDown = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, config).totalCost();
Assert.assertTrue(
"Cost at current should be less than cost for scale up",
@@ -201,8 +201,8 @@ public class WeightedCostFunctionTest
.defaultProcessingRate(1000.0)
.build();
- double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount
+ 5, lagOnlyConfig);
- double costDown5 = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, lagOnlyConfig);
+ double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount
+ 5, lagOnlyConfig).totalCost();
+ double costDown5 = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, lagOnlyConfig).totalCost();
Assert.assertEquals(
"Lag cost for +5 and -5 deviation should be equal",
@@ -229,10 +229,10 @@ public class WeightedCostFunctionTest
// Current: 10 tasks with 40% idle (60% busy)
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
- double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig);
- double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig);
- double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig);
- double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig);
+ double costAt5 = costFunction.computeCost(metrics, 5,
idleOnlyConfig).totalCost();
+ double costAt10 = costFunction.computeCost(metrics, 10,
idleOnlyConfig).totalCost();
+ double costAt15 = costFunction.computeCost(metrics, 15,
idleOnlyConfig).totalCost();
+ double costAt20 = costFunction.computeCost(metrics, 20,
idleOnlyConfig).totalCost();
// Monotonically increasing idle cost as tasks increase
Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10);
@@ -256,7 +256,7 @@ public class WeightedCostFunctionTest
// busyFraction = 0.6, taskRatio = 0.2
// predictedIdle = 1 - 0.6/0.2 = 1 - 3 = -2 → clamped to 0
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
- double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig);
+ double costAt2 = costFunction.computeCost(metrics, 2,
idleOnlyConfig).totalCost();
// idlenessCost = taskCount * taskDuration * 0.0 (clamped) = 0
Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped
to 0", 0.0, costAt2, 0.0001);
@@ -266,7 +266,7 @@ public class WeightedCostFunctionTest
// busyFraction = 0.9, taskRatio = 10
// predictedIdle = 1 - 0.9/10 = 1 - 0.09 = 0.91 (within bounds)
CostMetrics lowIdle = createMetrics(0.0, 10, 100, 0.1);
- double costAt100 = costFunction.computeCost(lowIdle, 100, idleOnlyConfig);
+ double costAt100 = costFunction.computeCost(lowIdle, 100,
idleOnlyConfig).totalCost();
// idlenessCost = 100 * 3600 * 0.91 = 327600
Assert.assertTrue("Cost should be finite and positive",
Double.isFinite(costAt100) && costAt100 > 0);
}
@@ -286,8 +286,8 @@ public class WeightedCostFunctionTest
// Negative idle ratio indicates missing data → should default to 0.5
CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0);
- double cost10 = costFunction.computeCost(missingIdleData, 10,
idleOnlyConfig);
- double cost20 = costFunction.computeCost(missingIdleData, 20,
idleOnlyConfig);
+ double cost10 = costFunction.computeCost(missingIdleData, 10,
idleOnlyConfig).totalCost();
+ double cost20 = costFunction.computeCost(missingIdleData, 20,
idleOnlyConfig).totalCost();
// With missing data, predicted idle = 0.5 for all task counts
// idlenessCost at 10 = 10 * 3600 * 0.5 = 18000
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 30a0d7a723e..ce6c87e5e08 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import javax.annotation.Nullable;
@@ -86,6 +87,11 @@ public interface Supervisor
return null; // default implementation for interface compatability;
returning null since true or false is misleading
}
+ default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
+ {
+ return spec.createAutoscaler(this);
+ }
+
/**
* Resets any stored metadata by the supervisor.
* @param dataSourceMetadata optional dataSource metadata.
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
index c921e2740b8..17cd347231a 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -24,4 +24,15 @@ public interface SupervisorTaskAutoScaler
void start();
void stop();
void reset();
+
+ /**
+ * Computes the optimal task count during task rollover, allowing a
non-disruptive scale-down.
+ * Must be called by the supervisor when tasks are ending their duration.
+ *
+ * @return optimal task count for scale-down, or -1 if no change needed
+ */
+ default int computeTaskCountForRollover()
+ {
+ return -1;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]