This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ed0264a54d7 Add configurable option to scale-down during task run time
for cost-based autoscaler (#18958)
ed0264a54d7 is described below
commit ed0264a54d74b4de318a214c58090756905cfc00
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Jan 28 17:14:37 2026 +0200
Add configurable option to scale-down during task run time for cost-based
autoscaler (#18958)
* Add configurable option to scale-down during task run time for cost-based
autoscaler
* Docs
* Address review comments, compress tests a bit
---
docs/ingestion/supervisor.md | 2 +
.../CostBasedAutoScalerIntegrationTest.java | 5 +
.../supervisor/autoscaler/CostBasedAutoScaler.java | 20 ++-
.../autoscaler/CostBasedAutoScalerConfig.java | 70 ++++++--
.../autoscaler/CostBasedAutoScalerConfigTest.java | 62 ++++++-
.../autoscaler/CostBasedAutoScalerMockTest.java | 140 +++++++++++++--
.../autoscaler/CostBasedAutoScalerTest.java | 197 +++++++--------------
7 files changed, 332 insertions(+), 164 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 26a3aaea71b..b6be49cec9b 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -208,6 +208,8 @@ The following table outlines the configuration properties
related to the `costBa
|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25|
|`idleWeight`|The weight of extracted poll idle value in cost function. | No |
0.75 |
|`defaultProcessingRate`|A planned processing rate per task, required for
first cost estimations. | No | 1000 |
+|`scaleDownBarrier`| A number of successful scale down attempts which should
be skipped to prevent the auto-scaler from scaling down tasks immediately. |
No | 5 |
+|`scaleDownDuringTaskRolloverOnly`| Indicates whether task scaling down is
limited to periods during task rollovers only. | No | False |
The following example shows a supervisor spec with `lagBased` autoscaler:
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index 140c4626d22..ee9712f8f82 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -132,6 +132,8 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
// Weight configuration: strongly favor lag reduction over idle time
.lagWeight(0.9)
.idleWeight(0.1)
+ .scaleDownDuringTaskRolloverOnly(false)
+ .scaleDownBarrier(1)
.build();
final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
@@ -225,6 +227,9 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
// High idle weight ensures scale-down when tasks are mostly idle
(little data to process)
.lagWeight(0.1)
.idleWeight(0.9)
+ .scaleDownDuringTaskRolloverOnly(false)
+ // Do not slow scale-downs
+ .scaleDownBarrier(0)
.build();
final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 350e2284359..dcdeea0ccd0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -90,6 +90,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private final WeightedCostFunction costFunction;
private volatile CostMetrics lastKnownMetrics;
+ private int scaleDownCounter = 0;
+
public CostBasedAutoScaler(
SeekableStreamSupervisor supervisor,
CostBasedAutoScalerConfig config,
@@ -148,7 +150,12 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
@Override
public int computeTaskCountForRollover()
{
- return computeOptimalTaskCount(lastKnownMetrics);
+ if (config.isScaleDownOnTaskRolloverOnly()) {
+ return computeOptimalTaskCount(lastKnownMetrics);
+ } else {
+ scaleDownCounter = 0;
+ return -1;
+ }
}
public int computeTaskCountForScaleAction()
@@ -157,11 +164,18 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
- // Perform only scale-up actions
+ // Perform scale-up actions; scale-down actions only if configured.
int taskCount = -1;
if (optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
- log.info("New task count [%d] on supervisor [%s]", taskCount,
supervisorId);
+ scaleDownCounter = 0; // Nullify the scaleDown counter after a
successful scaleup too.
+ log.info("New task count [%d] on supervisor [%s], scaling up",
taskCount, supervisorId);
+ } else if (!config.isScaleDownOnTaskRolloverOnly()
+ && optimalTaskCount < currentTaskCount
+ && ++scaleDownCounter >= config.getScaleDownBarrier()) {
+ taskCount = optimalTaskCount;
+ scaleDownCounter = 0;
+ log.info("New task count [%d] on supervisor [%s], scaling down",
taskCount, supervisorId);
} else {
log.info("No scaling required for supervisor [%s]", supervisorId);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index aba26ba25b5..d835fea5157 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -43,11 +43,12 @@ import java.util.Objects;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class CostBasedAutoScalerConfig implements AutoScalerConfig
{
- private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 15 * 60 *
1000; // 15 minutes
- private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS
= 1200000; // 20 minutes
- private static final double DEFAULT_LAG_WEIGHT = 0.25;
- private static final double DEFAULT_IDLE_WEIGHT = 0.75;
- private static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000
records/sec per task as default
+ static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10
minutes
+ static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60
* 1000; // 5 minutes
+ static final double DEFAULT_LAG_WEIGHT = 0.25;
+ static final double DEFAULT_IDLE_WEIGHT = 0.75;
+ static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec
per task as default
+ static final int DEFAULT_SCALE_DOWN_BARRIER = 5; // We delay scale down by 5
* DEFAULT_SCALE_ACTION_PERIOD_MILLIS
private final boolean enableTaskAutoScaler;
private final int taskCountMax;
@@ -60,6 +61,18 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private final double lagWeight;
private final double idleWeight;
private final double defaultProcessingRate;
+ /**
+ * Represents the threshold value used to prevent the auto-scaler from
scaling down tasks immediately,
+ * when the computed cost-based metrics fall below this barrier.
+ * A higher value implies a more conservative scaling down behavior,
ensuring that tasks
+ * are not prematurely terminated in scenarios of potential workload spikes
or insufficient cost savings.
+ */
+ private final int scaleDownBarrier;
+ /**
+ * Indicates whether task scaling down is limited to periods during task
rollovers only.
+ * If set to {@code false}, allows scaling down during normal task run time.
+ */
+ private final boolean scaleDownDuringTaskRolloverOnly;
@JsonCreator
public CostBasedAutoScalerConfig(
@@ -72,7 +85,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("scaleActionPeriodMillis") Long
scaleActionPeriodMillis,
@Nullable @JsonProperty("lagWeight") Double lagWeight,
@Nullable @JsonProperty("idleWeight") Double idleWeight,
- @Nullable @JsonProperty("defaultProcessingRate") Double
defaultProcessingRate
+ @Nullable @JsonProperty("defaultProcessingRate") Double
defaultProcessingRate,
+ @Nullable @JsonProperty("scaleDownBarrier") Integer scaleDownBarrier,
+ @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean
scaleDownDuringTaskRolloverOnly
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ?
enableTaskAutoScaler : false;
@@ -90,6 +105,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate,
DEFAULT_PROCESSING_RATE);
+ this.scaleDownBarrier = Configs.valueOrDefault(scaleDownBarrier,
DEFAULT_SCALE_DOWN_BARRIER);
+ this.scaleDownDuringTaskRolloverOnly =
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
if (this.enableTaskAutoScaler) {
Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when
enableTaskAutoScaler is true");
@@ -107,17 +124,16 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
}
this.taskCountStart = taskCountStart;
- // Validate stopTaskCountRatio
Preconditions.checkArgument(
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 &&
stopTaskCountRatio <= 1.0),
"0.0 < stopTaskCountRatio <= 1.0"
);
this.stopTaskCountRatio = stopTaskCountRatio;
- // Validate weights are non-negative
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >=
0");
Preconditions.checkArgument(this.defaultProcessingRate > 0,
"defaultProcessingRate must be > 0");
+ Preconditions.checkArgument(this.scaleDownBarrier >= 0, "scaleDownBarrier
must be >= 0");
}
/**
@@ -196,6 +212,18 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return defaultProcessingRate;
}
+ @JsonProperty
+ public int getScaleDownBarrier()
+ {
+ return scaleDownBarrier;
+ }
+
+ @JsonProperty("scaleDownDuringTaskRolloverOnly")
+ public boolean isScaleDownOnTaskRolloverOnly()
+ {
+ return scaleDownDuringTaskRolloverOnly;
+ }
+
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec, ServiceEmitter emitter)
{
@@ -222,6 +250,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
&& Double.compare(that.lagWeight, lagWeight) == 0
&& Double.compare(that.idleWeight, idleWeight) == 0
&& Double.compare(that.defaultProcessingRate,
defaultProcessingRate) == 0
+ && scaleDownBarrier == that.scaleDownBarrier
+ && scaleDownDuringTaskRolloverOnly ==
that.scaleDownDuringTaskRolloverOnly
&& Objects.equals(taskCountStart, that.taskCountStart)
&& Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
}
@@ -239,7 +269,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
scaleActionPeriodMillis,
lagWeight,
idleWeight,
- defaultProcessingRate
+ defaultProcessingRate,
+ scaleDownBarrier,
+ scaleDownDuringTaskRolloverOnly
);
}
@@ -257,6 +289,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
", lagWeight=" + lagWeight +
", idleWeight=" + idleWeight +
", defaultProcessingRate=" + defaultProcessingRate +
+ ", scaleDownBarrier=" + scaleDownBarrier +
+ ", scaleDownDuringTaskRolloverOnly=" +
scaleDownDuringTaskRolloverOnly +
'}';
}
@@ -276,6 +310,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private Double lagWeight;
private Double idleWeight;
private Double defaultProcessingRate;
+ private Integer scaleDownBarrier;
+ private Boolean scaleDownDuringTaskRolloverOnly;
private Builder()
{
@@ -341,6 +377,18 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return this;
}
+ public Builder scaleDownBarrier(int scaleDownBarrier)
+ {
+ this.scaleDownBarrier = scaleDownBarrier;
+ return this;
+ }
+
+ public Builder scaleDownDuringTaskRolloverOnly(boolean
scaleDownDuringTaskRolloverOnly)
+ {
+ this.scaleDownDuringTaskRolloverOnly = scaleDownDuringTaskRolloverOnly;
+ return this;
+ }
+
public CostBasedAutoScalerConfig build()
{
return new CostBasedAutoScalerConfig(
@@ -353,7 +401,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
scaleActionPeriodMillis,
lagWeight,
idleWeight,
- defaultProcessingRate
+ defaultProcessingRate,
+ scaleDownBarrier,
+ scaleDownDuringTaskRolloverOnly
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index b585667adcf..364e0d75b80 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -24,6 +24,13 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_PROCESSING_RATE;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_DOWN_BARRIER;
+
public class CostBasedAutoScalerConfigTest
{
private final ObjectMapper mapper = new DefaultObjectMapper();
@@ -42,7 +49,9 @@ public class CostBasedAutoScalerConfigTest
+ " \"scaleActionPeriodMillis\": 60000,\n"
+ " \"lagWeight\": 0.6,\n"
+ " \"idleWeight\": 0.4,\n"
- + " \"distancePenaltyExponent\": 3.0\n"
+ + " \"defaultProcessingRate\": 2000.0,\n"
+ + " \"scaleDownBarrier\": 10,\n"
+ + " \"scaleDownDuringTaskRolloverOnly\": true\n"
+ "}";
CostBasedAutoScalerConfig config = mapper.readValue(json,
CostBasedAutoScalerConfig.class);
@@ -56,6 +65,9 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+ Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
+ Assert.assertEquals(10, config.getScaleDownBarrier());
+ Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
// Test serialization back to JSON
String serialized = mapper.writeValueAsString(config);
@@ -81,10 +93,15 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(2, config.getTaskCountMin());
// Check defaults
- Assert.assertEquals(900000L, config.getScaleActionPeriodMillis());
- Assert.assertEquals(1200000L,
config.getMinTriggerScaleActionFrequencyMillis());
- Assert.assertEquals(0.25, config.getLagWeight(), 0.001);
- Assert.assertEquals(0.75, config.getIdleWeight(), 0.001);
+ Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS,
config.getScaleActionPeriodMillis());
+ Assert.assertEquals(DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
+ Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
+ Assert.assertEquals(DEFAULT_PROCESSING_RATE,
config.getDefaultProcessingRate(), 0.001);
+ Assert.assertEquals(DEFAULT_SCALE_DOWN_BARRIER,
config.getScaleDownBarrier());
+ Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
+ Assert.assertNull(config.getTaskCountStart());
+ Assert.assertNull(config.getStopTaskCountRatio());
}
@Test
@@ -98,6 +115,9 @@ public class CostBasedAutoScalerConfigTest
CostBasedAutoScalerConfig config = mapper.readValue(json,
CostBasedAutoScalerConfig.class);
Assert.assertFalse(config.getEnableTaskAutoScaler());
+ // When disabled, taskCountMax and taskCountMin default to 0
+ Assert.assertEquals(0, config.getTaskCountMax());
+ Assert.assertEquals(0, config.getTaskCountMin());
}
@Test(expected = RuntimeException.class)
@@ -149,4 +169,36 @@ public class CostBasedAutoScalerConfigTest
.enableTaskAutoScaler(true)
.build();
}
+
+ @Test
+ public void testBuilder()
+ {
+ CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(5)
+
.taskCountStart(10)
+
.enableTaskAutoScaler(true)
+
.minTriggerScaleActionFrequencyMillis(600000L)
+
.stopTaskCountRatio(0.8)
+
.scaleActionPeriodMillis(60000L)
+
.lagWeight(0.6)
+
.idleWeight(0.4)
+
.defaultProcessingRate(2000.0)
+
.scaleDownBarrier(10)
+
.scaleDownDuringTaskRolloverOnly(true)
+ .build();
+
+ Assert.assertTrue(config.getEnableTaskAutoScaler());
+ Assert.assertEquals(100, config.getTaskCountMax());
+ Assert.assertEquals(5, config.getTaskCountMin());
+ Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
+ Assert.assertEquals(600000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
+ Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
+ Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
+ Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+ Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
+ Assert.assertEquals(10, config.getScaleDownBarrier());
+ Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 4147a50163d..becfd4964b4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -77,23 +77,47 @@ public class CostBasedAutoScalerMockTest
@Test
public void testScaleUpWhenOptimalGreaterThanCurrent()
{
- // Setup: current = 10, optimal should be higher due to high lag and low
idle
- CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+ // Use config with barrier=2 to test counter reset behavior
+ CostBasedAutoScalerConfig barrierConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.scaleDownBarrier(2)
+
.build();
+
+ CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+ mockSupervisor,
+ barrierConfig,
+ mockSpec,
+ mockEmitter
+ ));
int currentTaskCount = 10;
- int expectedOptimalCount = 17; // Higher than current
+ int scaleUpOptimal = 17;
+ int scaleDownOptimal = 5;
- CostMetrics metrics = createMetrics(5000.0, currentTaskCount,
PARTITION_COUNT, 0.1);
+ // First, increment the scaleDownCounter by making a scale-down attempt
+ doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ Assert.assertEquals("Scale-down blocked (counter=1)", -1,
autoScaler.computeTaskCountForScaleAction());
-
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
+ // Now trigger scale-up, which should reset the counter
+ doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(currentTaskCount, 5000.0, 0.1);
- int result = autoScaler.computeTaskCountForScaleAction();
-
Assert.assertEquals(
"Should return optimal count when it's greater than current
(scale-up)",
- expectedOptimalCount,
- result
+ scaleUpOptimal,
+ autoScaler.computeTaskCountForScaleAction()
+ );
+
+ // Verify counter was reset: scale-down should be blocked again (counter
starts from 0)
+ doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ Assert.assertEquals(
+ "Scale-down should be blocked after scale-up reset the counter",
+ -1,
+ autoScaler.computeTaskCountForScaleAction()
);
}
@@ -116,8 +140,20 @@ public class CostBasedAutoScalerMockTest
@Test
public void testScaleDownBlockedReturnsMinusOne()
{
- // Scale-down is blocked in computeTaskCountForScaleAction
- CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+ // Use config with barrier=2 to test counter behavior
+ CostBasedAutoScalerConfig barrierConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.scaleDownBarrier(2)
+
.build();
+
+ CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+ mockSupervisor,
+ barrierConfig,
+ mockSpec,
+ mockEmitter
+ ));
int currentTaskCount = 50;
int optimalCount = 30; // Lower than current (scale-down scenario)
@@ -125,12 +161,25 @@ public class CostBasedAutoScalerMockTest
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
- int result = autoScaler.computeTaskCountForScaleAction();
+ // First attempt: counter=1, blocked
+ Assert.assertEquals(
+ "Should return -1 when optimal is less than current (scale-down
blocked, counter=1)",
+ -1,
+ autoScaler.computeTaskCountForScaleAction()
+ );
+
+ // Second attempt: counter=2, succeeds (barrier reached)
+ Assert.assertEquals(
+ "Scale-down should succeed when barrier reached",
+ optimalCount,
+ autoScaler.computeTaskCountForScaleAction()
+ );
+ // Verify counter was reset: next scale-down should be blocked again
Assert.assertEquals(
- "Should return -1 when optimal is less than current (scale-down
blocked)",
+ "Scale-down should be blocked after successful scale-down reset the
counter",
-1,
- result
+ autoScaler.computeTaskCountForScaleAction()
);
}
@@ -257,6 +306,69 @@ public class CostBasedAutoScalerMockTest
);
}
+ @Test
+ public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled()
+ {
+ CostBasedAutoScalerConfig rolloverOnlyConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.scaleDownDuringTaskRolloverOnly(true)
+
.scaleDownBarrier(1) // Set the barrier to 1 so it would trigger immediately if
not blocked
+
.build();
+
+ CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+ mockSupervisor,
+ rolloverOnlyConfig,
+ mockSpec,
+ mockEmitter
+ ));
+
+ int currentTaskCount = 50;
+ int optimalCount = 30; // Lower than current (scale-down scenario)
+
+ doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+
+ Assert.assertEquals(
+ "Should return -1 when scaleDownDuringTaskRolloverOnly is true",
+ -1,
+ autoScaler.computeTaskCountForScaleAction()
+ );
+ }
+
+ @Test
+ public void
testScaleDownAllowedDuringRolloverWhenScaleDownOnRolloverOnlyEnabled()
+ {
+ CostBasedAutoScalerConfig rolloverOnlyConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.scaleDownDuringTaskRolloverOnly(true)
+
.build();
+
+ CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
+ mockSupervisor,
+ rolloverOnlyConfig,
+ mockSpec,
+ mockEmitter
+ ));
+
+ int currentTaskCount = 50;
+ int optimalCount = 30;
+
+ // Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
+ doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ autoScaler.computeTaskCountForScaleAction(); // This populates
lastKnownMetrics
+
+ Assert.assertEquals(
+ "Should scale-down during rollover when
scaleDownDuringTaskRolloverOnly is true",
+ optimalCount,
+ autoScaler.computeTaskCountForRollover()
+ );
+ }
+
private void setupMocksForMetricsCollection(int taskCount, double avgLag,
double pollIdleRatio)
{
when(mockSupervisor.computeLagStats()).thenReturn(new LagStats(0, (long)
avgLag * 2, (long) avgLag));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 54299d915f6..23192900ec0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -94,11 +94,8 @@ public class CostBasedAutoScalerTest
Assert.assertEquals(2, exceedsPartitions.length);
Assert.assertTrue(contains(exceedsPartitions, 1));
Assert.assertTrue(contains(exceedsPartitions, 2));
- }
- @Test
- public void testComputeValidTaskCountsLagExpansion()
- {
+ // Lag expansion: low lag should not include max, high lag should
int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30);
Assert.assertFalse("Low lag should not include max task count",
contains(lowLagCounts, 30));
Assert.assertTrue("Low lag should cap scale up around 4 tasks",
contains(lowLagCounts, 4));
@@ -106,12 +103,8 @@ public class CostBasedAutoScalerTest
long highAggregateLag = 30L * 500_000L;
int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30);
Assert.assertTrue("High lag should allow scaling to max tasks",
contains(highLagCounts, 30));
- }
- @Test
- public void testComputeValidTaskCountsRespectsTaskCountMax()
- {
- long highAggregateLag = 30L * 500_000L;
+ // Respects taskCountMax
int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3);
Assert.assertTrue("Should include taskCountMax when doable",
contains(cappedCounts, 3));
Assert.assertFalse("Should not exceed taskCountMax",
contains(cappedCounts, 4));
@@ -125,6 +118,31 @@ public class CostBasedAutoScalerTest
double pollIdleRatio = 0.1;
double avgProcessingRate = 10.0;
+ // Create a local autoScaler with taskCountMax matching the test parameters
+ SupervisorSpec mockSpec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor mockSupervisor =
Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig mockIoConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(mockSpec.getId()).thenReturn("test-supervisor");
+ when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+ when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+ CostBasedAutoScalerConfig localConfig = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(taskCountMax)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(0.6)
+
.idleWeight(0.4)
+ .build();
+
+ CostBasedAutoScaler localAutoScaler = new CostBasedAutoScaler(
+ mockSupervisor,
+ localConfig,
+ mockSpec,
+ mockEmitter
+ );
+
class Example
{
final int currentTasks;
@@ -165,7 +183,7 @@ public class CostBasedAutoScalerTest
pollIdleRatio,
avgProcessingRate
);
- int actualOptimal = autoScaler.computeOptimalTaskCount(metrics);
+ int actualOptimal = localAutoScaler.computeOptimalTaskCount(metrics);
if (actualOptimal == -1) {
actualOptimal = example.currentTasks;
}
@@ -193,17 +211,14 @@ public class CostBasedAutoScalerTest
}
@Test
- public void testComputeOptimalTaskCountInvalidInputs()
+ public void testComputeOptimalTaskCount()
{
+ // Invalid inputs return -1
Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(null));
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(0.0, 10, 0, 0.0)));
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
- }
- @Test
- public void testComputeOptimalTaskCountScaling()
- {
// High idle (underutilized) - should scale down
int scaleDownResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult <
25);
@@ -236,31 +251,27 @@ public class CostBasedAutoScalerTest
group.put("task-1", buildTaskStatsWithPollIdle(0.5));
validStats.put("0", group);
Assert.assertEquals(0.4,
CostBasedAutoScaler.extractPollIdleRatio(validStats), 0.0001);
- }
- @Test
- public void testExtractPollIdleRatioInvalidTypes()
- {
- // Non-map task metric
+ // Invalid types: non-map task metric
Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
- // Empty autoscaler metrics
+ // Invalid types: empty autoscaler metrics
Map<String, Map<String, Object>> emptyAutoscaler = new HashMap<>();
Map<String, Object> taskStats1 = new HashMap<>();
taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new
HashMap<>());
emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
- // Non-map autoscaler metrics
+ // Invalid types: non-map autoscaler metrics
Map<String, Map<String, Object>> nonMapAutoscaler = new HashMap<>();
Map<String, Object> taskStats2 = new HashMap<>();
taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY,
"not-a-map");
nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
- // Non-number poll idle ratio
+ // Invalid types: non-number poll idle ratio
Map<String, Map<String, Object>> nonNumberRatio = new HashMap<>();
Map<String, Object> taskStats3 = new HashMap<>();
Map<String, Object> autoscalerMetrics = new HashMap<>();
@@ -289,20 +300,10 @@ public class CostBasedAutoScalerTest
group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
validStats.put("0", group);
Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
- }
- @Test
- public void testExtractMovingAverageIntervalFallback()
- {
- // 15-minute average is preferred
+ // Interval fallback: 15-minute preferred, then 5-minute, then 1-minute
Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
- fifteenMin.put(
- "0",
- Collections.singletonMap(
- "task-0",
- buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME,
1500.0)
- )
- );
+ fifteenMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
// 1-minute as a final fallback
@@ -334,56 +335,21 @@ public class CostBasedAutoScalerTest
nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
- // Missing buildSegments
Map<String, Map<String, Object>> missingBuild = new HashMap<>();
Map<String, Object> taskStats1 = new HashMap<>();
taskStats1.put("movingAverages", new HashMap<>());
missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
- // Non-map movingAverages
Map<String, Map<String, Object>> nonMapMA = new HashMap<>();
Map<String, Object> taskStats2 = new HashMap<>();
taskStats2.put("movingAverages", "not-a-map");
nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
-
- // Non-map buildSegments
- Map<String, Map<String, Object>> nonMapBS = new HashMap<>();
- Map<String, Object> taskStats3 = new HashMap<>();
- Map<String, Object> movingAverages3 = new HashMap<>();
- movingAverages3.put(RowIngestionMeters.BUILD_SEGMENTS, "not-a-map");
- taskStats3.put("movingAverages", movingAverages3);
- nonMapBS.put("0", Collections.singletonMap("task-0", taskStats3));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapBS), 0.0001);
-
- // Non-map interval data
- Map<String, Map<String, Object>> nonMapInterval = new HashMap<>();
- Map<String, Object> taskStats4 = new HashMap<>();
- Map<String, Object> movingAverages4 = new HashMap<>();
- Map<String, Object> buildSegments4 = new HashMap<>();
- buildSegments4.put(FIFTEEN_MINUTE_NAME, "not-a-map");
- movingAverages4.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments4);
- taskStats4.put("movingAverages", movingAverages4);
- nonMapInterval.put("0", Collections.singletonMap("task-0", taskStats4));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapInterval), 0.0001);
-
- // Non-number processed rate
- Map<String, Map<String, Object>> nonNumberRate = new HashMap<>();
- Map<String, Object> taskStats5 = new HashMap<>();
- Map<String, Object> movingAverages5 = new HashMap<>();
- Map<String, Object> buildSegments5 = new HashMap<>();
- Map<String, Object> fifteenMin = new HashMap<>();
- fifteenMin.put(RowIngestionMeters.PROCESSED, "not-a-number");
- buildSegments5.put(FIFTEEN_MINUTE_NAME, fifteenMin);
- movingAverages5.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments5);
- taskStats5.put("movingAverages", movingAverages5);
- nonNumberRate.put("0", Collections.singletonMap("task-0", taskStats5));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonNumberRate), 0.0001);
}
@Test
- public void testComputeTaskCountForRolloverReturnsMinusOneWhenSuspended()
+ public void testComputeTaskCountForRolloverAndConfigProperties()
{
SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
@@ -391,73 +357,40 @@ public class CostBasedAutoScalerTest
SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
when(spec.getId()).thenReturn("s-up");
- when(spec.isSuspended()).thenReturn(true);
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("stream");
- CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
- .taskCountMax(10)
- .taskCountMin(1)
-
.enableTaskAutoScaler(true)
- .lagWeight(0.5)
- .idleWeight(0.5)
- .build();
-
- CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg,
spec, emitter);
- Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
- }
-
- @Test
- public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull()
- {
- SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
- SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
- ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
- SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
-
- when(spec.getId()).thenReturn("s-up");
+ // Test config defaults for scaleDownBarrier, defaultProcessingRate,
scaleDownDuringTaskRolloverOnly
+ CostBasedAutoScalerConfig cfgWithDefaults =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.build();
+ Assert.assertEquals(5, cfgWithDefaults.getScaleDownBarrier());
+ Assert.assertEquals(1000.0, cfgWithDefaults.getDefaultProcessingRate(),
0.001);
+ Assert.assertFalse(cfgWithDefaults.isScaleDownOnTaskRolloverOnly());
+
+ // Test custom config values
+ CostBasedAutoScalerConfig cfgWithCustom =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.scaleDownBarrier(10)
+
.defaultProcessingRate(5000.0)
+
.scaleDownDuringTaskRolloverOnly(true)
+
.build();
+ Assert.assertEquals(10, cfgWithCustom.getScaleDownBarrier());
+ Assert.assertEquals(5000.0, cfgWithCustom.getDefaultProcessingRate(),
0.001);
+ Assert.assertTrue(cfgWithCustom.isScaleDownOnTaskRolloverOnly());
+
+ // computeTaskCountForRollover returns -1 when
scaleDownDuringTaskRolloverOnly=false (default)
when(spec.isSuspended()).thenReturn(false);
- when(supervisor.computeLagStats()).thenReturn(null);
- when(supervisor.getIoConfig()).thenReturn(ioConfig);
- when(ioConfig.getStream()).thenReturn("stream");
-
- CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
- .taskCountMax(10)
- .taskCountMin(1)
-
.enableTaskAutoScaler(true)
- .lagWeight(0.5)
- .idleWeight(0.5)
- .build();
-
- CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg,
spec, emitter);
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor,
cfgWithDefaults, spec, emitter);
Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
- }
-
- @Test
- public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
- {
- // Tests the case where the lastKnownMetrics is null (no
computeTaskCountForScaleAction called)
- SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
- SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
- ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
- SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
- when(spec.getId()).thenReturn("s-up");
- when(spec.isSuspended()).thenReturn(false);
- when(supervisor.getIoConfig()).thenReturn(ioConfig);
- when(ioConfig.getStream()).thenReturn("stream");
-
- CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
- .taskCountMax(10)
- .taskCountMin(1)
-
.enableTaskAutoScaler(true)
- .lagWeight(0.5)
- .idleWeight(0.5)
- .build();
-
- CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg,
spec, emitter);
- // Should return -1 when lastKnownMetrics is null
- Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+ // computeTaskCountForRollover returns -1 when lastKnownMetrics is null
(even with scaleDownDuringTaskRolloverOnly=true)
+ CostBasedAutoScaler scalerWithRolloverOnly = new
CostBasedAutoScaler(supervisor, cfgWithCustom, spec, emitter);
+ Assert.assertEquals(-1,
scalerWithRolloverOnly.computeTaskCountForRollover());
}
private CostMetrics createMetrics(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]