This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2cf5da7b29e Harmonize metrics for lag and cost based autoscalers.
(#19097)
2cf5da7b29e is described below
commit 2cf5da7b29e8dd3ba6dc5dde499fb8ff78f31d63
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Mar 6 12:18:19 2026 -0800
Harmonize metrics for lag and cost based autoscalers. (#19097)
This patch adjusts the autoscalers to emit metrics with supervisorId,
dataSource, and stream. Previously, the lag-based autoscaler emitted
only dataSource (which was actually the supervisor id) and stream, and
the cost-based autoscaler emitted only supervisorId and stream.
---
docs/operations/metrics.md | 7 +++-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 22 ++++++++---
.../supervisor/autoscaler/LagBasedAutoScaler.java | 45 ++++++++++++++--------
.../autoscaler/LagBasedAutoScalerConfig.java | 7 +++-
.../SeekableStreamSupervisorSpecTest.java | 18 ++-------
.../autoscaler/CostBasedAutoScalerMockTest.java | 1 +
.../autoscaler/CostBasedAutoScalerTest.java | 4 ++
7 files changed, 63 insertions(+), 41 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 4e422696fc8..6b2da315b30 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -310,8 +310,11 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor.|`supervisorId`, `dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
coordinator cycle time.|
-|`task/autoScaler/requiredCount`|Count of required tasks based on the
calculations of `lagBased` auto scaler.|`supervisorId`, `dataSource`, `stream`,
`scalingSkipReason`|Depends on auto scaler config.|
-|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the
scale action.|`supervisorId`, `dataSource`, `stream`|Depends on auto scaler
config.|
+|`task/autoScaler/requiredCount`|Count of required tasks based on the
calculations of the auto scaler.|`supervisorId`, `dataSource`, `stream`,
`scalingSkipReason`|Depends on auto scaler config.|
+|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the
scale action.|`supervisorId`, `dataSource`, `stream`, `tags`|Depends on auto
scaler config.|
+|`task/autoScaler/costBased/optimalTaskCount`|Optimal task count computed by
the cost-based auto scaler.|`supervisorId`, `dataSource`, `stream`|Depends on
auto scaler config.|
+|`task/autoScaler/costBased/lagCost`|Lag cost component of the cost-based auto
scaler's cost function.|`supervisorId`, `dataSource`, `stream`|Depends on auto
scaler config.|
+|`task/autoScaler/costBased/idleCost`|Idle cost component of the cost-based
auto scaler's cost function.|`supervisorId`, `dataSource`, `stream`|Depends on
auto scaler config.|
If the JVM does not support CPU time measurement for the current thread,
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 6351c00f6d8..9c42a6f35c9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -21,6 +21,7 @@ package
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -35,6 +36,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.Map;
@@ -99,12 +101,20 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
this.costFunction = new WeightedCostFunction();
this.autoscalerExecutor =
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
+
StringUtils.encodeForFormat(spec.getId()));
- this.metricBuilder = ServiceMetricEvent.builder()
-
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
- .setDimension(
- DruidMetrics.STREAM,
-
this.supervisor.getIoConfig().getStream()
- );
+ this.metricBuilder =
+ ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
+ .setDimension(
+ DruidMetrics.DATASOURCE,
+ CollectionUtils.getOnlyElement(
+ spec.getDataSources(),
+ xs -> DruidException.defensive("Expected one
dataSource, got[%s]", xs)
+ )
+ )
+ .setDimension(
+ DruidMetrics.STREAM,
+ this.supervisor.getIoConfig().getStream()
+ );
}
@SuppressWarnings("unchecked")
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 142193ae63f..bf35576fbd8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -31,6 +32,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@@ -42,7 +44,6 @@ import java.util.concurrent.locks.ReentrantLock;
public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
{
private static final EmittingLogger log = new
EmittingLogger(LagBasedAutoScaler.class);
- private final String dataSource;
private final CircularFifoQueue<Long> lagMetricsQueue;
private final ScheduledExecutorService lagComputationExec;
private final ScheduledExecutorService allocationExec;
@@ -56,15 +57,17 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
public LagBasedAutoScaler(
SeekableStreamSupervisor supervisor,
- String dataSource,
LagBasedAutoScalerConfig autoScalerConfig,
SupervisorSpec spec,
ServiceEmitter emitter
)
{
this.lagBasedAutoScalerConfig = autoScalerConfig;
+ final String dataSource = CollectionUtils.getOnlyElement(
+ spec.getDataSources(),
+ xs -> DruidException.defensive("Expected one dataSource, got[%s]", xs)
+ );
final String supervisorId = StringUtils.format("Supervisor-%s",
dataSource);
- this.dataSource = dataSource;
final int slots = (int)
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() /
lagBasedAutoScalerConfig
.getLagCollectionIntervalMillis()) + 1;
this.lagMetricsQueue = new CircularFifoQueue<>(slots);
@@ -74,6 +77,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
this.supervisor = supervisor;
this.emitter = emitter;
metricBuilder = ServiceMetricEvent.builder()
+
.setDimension(DruidMetrics.SUPERVISOR_ID, spec.getId())
.setDimension(DruidMetrics.DATASOURCE,
dataSource)
.setDimension(DruidMetrics.STREAM,
this.supervisor.getIoConfig().getStream());
}
@@ -88,7 +92,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
}
catch (Exception ex) {
- log.warn(ex, "Exception while computing desired task count for [%s]",
dataSource);
+ log.warn(ex, "Exception while computing desired task count for
supervisor[%s]", spec.getId());
}
finally {
LOCK.unlock();
@@ -102,7 +106,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
lagMetricsQueue.clear();
}
catch (Exception ex) {
- log.warn(ex, "Exception while clearing lags for [%s]", dataSource);
+ log.warn(ex, "Exception while clearing lags for supervisor[%s]",
spec.getId());
}
finally {
LOCK.unlock();
@@ -123,11 +127,11 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
TimeUnit.MILLISECONDS
);
log.info(
- "LagBasedAutoScaler will collect lag every [%d] millis and will keep
up to [%d] data points for the last [%d] millis for dataSource [%s]",
+ "LagBasedAutoScaler will collect lag every [%d] millis and will keep
up to [%d] data points for the last [%d] millis for supervisor[%s]",
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
lagMetricsQueue.maxSize(),
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
- dataSource
+ spec.getId()
);
}
@@ -178,9 +182,9 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
} else {
lagMetricsQueue.offer(0L);
}
- log.debug("Current lags for dataSource[%s] are [%s].", dataSource,
lagMetricsQueue);
+ log.debug("Current lags for supervisor[%s] are [%s].", spec.getId(),
lagMetricsQueue);
} else {
- log.debug("Supervisor[%s] is suspended, skipping lag collection",
dataSource);
+ log.debug("Supervisor[%s] is suspended, skipping lag collection",
spec.getId());
}
}
catch (Exception e) {
@@ -212,7 +216,11 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
{
// if the supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
- log.debug("Computing the desired task count for [%s], based on following
lags : [%s]", dataSource, lags);
+ log.debug(
+ "Computing the desired task count for supervisor[%s], based on
following lags : [%s]",
+ spec.getId(),
+ lags
+ );
int beyond = 0;
int within = 0;
int metricsCount = lags.size();
@@ -227,15 +235,15 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
double beyondProportion = beyond * 1.0 / metricsCount;
double withinProportion = within * 1.0 / metricsCount;
- log.debug("Calculated beyondProportion is [%s] and withinProportion is
[%s] for dataSource [%s].", beyondProportion,
- withinProportion, dataSource
+ log.debug("Calculated beyondProportion is [%s] and withinProportion is
[%s] for supervisor[%s].", beyondProportion,
+ withinProportion, spec.getId()
);
int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int desiredActiveTaskCount;
int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
- log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+ log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?",
spec.getId());
return -1;
}
@@ -245,8 +253,9 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
int actualTaskCountMax =
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMax) {
- log.debug("CurrentActiveTaskCount reached task count Max limit,
skipping scale out action for dataSource [%s].",
- dataSource
+ log.debug(
+ "CurrentActiveTaskCount reached task count Max limit, skipping
scale out action for supervisor[%s].",
+ spec.getId()
);
emitter.emit(metricBuilder
.setDimension(
@@ -266,8 +275,10 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
int taskCount = currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep();
int actualTaskCountMin =
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMin) {
- log.debug("CurrentActiveTaskCount reached task count Min limit,
skipping scale in action for dataSource[%s].",
- dataSource
+ log.debug(
+ "CurrentActiveTaskCount reached task count Min limit[%d], skipping
scale in action for supervisor[%s].",
+ actualTaskCountMin,
+ spec.getId()
);
emitter.emit(metricBuilder
.setDimension(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index ad036dd0e10..c53ac0e379c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -186,7 +186,12 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec, ServiceEmitter emitter)
{
- return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor,
spec.getId(), this, spec, emitter);
+ return new LagBasedAutoScaler(
+ (SeekableStreamSupervisor) supervisor,
+ this,
+ spec,
+ emitter
+ );
}
@JsonProperty
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index d2660937f8a..baa5ca24050 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -79,6 +79,8 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
{
ingestionSchema =
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
dataSchema = EasyMock.mock(DataSchema.class);
+
EasyMock.expect(dataSchema.getDataSource()).andReturn(DATASOURCE).anyTimes();
+ EasyMock.replay(dataSchema);
seekableStreamSupervisorTuningConfig =
EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
seekableStreamSupervisorIOConfig =
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
supervisorConfig = new SupervisorStateManagerConfig();
@@ -89,6 +91,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes();
+
EasyMock.expect(spec.getDataSources()).andReturn(ImmutableList.of(DATASOURCE)).anyTimes();
}
@Test
@@ -416,7 +419,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(10),
LagBasedAutoScalerConfig.class
@@ -479,7 +481,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
@@ -534,7 +535,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
@@ -580,7 +580,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(2);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(3),
LagBasedAutoScalerConfig.class
@@ -628,7 +627,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(3);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleInProperties(),
LagBasedAutoScalerConfig.class
@@ -686,7 +684,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
modifiedScaleInProps,
LagBasedAutoScalerConfig.class
@@ -749,7 +746,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleInProperties(),
LagBasedAutoScalerConfig.class
@@ -861,9 +857,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.replay(ingestionSchema);
- EasyMock.expect(dataSchema.getDataSource()).andReturn(DATASOURCE);
- EasyMock.replay(dataSchema);
-
spec = new SeekableStreamSupervisorSpec(
SUPERVISOR,
ingestionSchema,
@@ -1208,7 +1201,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
@@ -1274,7 +1266,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(3);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
@@ -1328,7 +1319,6 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
- DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
@@ -1446,11 +1436,9 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
private void mockIngestionSchema()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
-
EasyMock.expect(dataSchema.getDataSource()).andReturn(DATASOURCE).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
- EasyMock.replay(dataSchema);
}
private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean
scaleOut)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 218b429183b..45c1c2b772e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -58,6 +58,7 @@ public class CostBasedAutoScalerMockTest
mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
when(mockSpec.getId()).thenReturn(SUPERVISOR_ID);
+
when(mockSpec.getDataSources()).thenReturn(java.util.List.of("test-datasource"));
when(mockSpec.isSuspended()).thenReturn(false);
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn(STREAM_NAME);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 1fcb78e450e..86504458b75 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -34,6 +34,7 @@ import org.mockito.Mockito;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
@@ -56,6 +57,7 @@ public class CostBasedAutoScalerTest
SeekableStreamSupervisorIOConfig mockIoConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
when(mockSupervisorSpec.getId()).thenReturn("test-supervisor");
+
when(mockSupervisorSpec.getDataSources()).thenReturn(List.of("test-datasource"));
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn("test-stream");
@@ -433,6 +435,7 @@ public class CostBasedAutoScalerTest
SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
when(spec.getId()).thenReturn("s-up");
+ when(spec.getDataSources()).thenReturn(List.of("test-datasource"));
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("stream");
@@ -477,6 +480,7 @@ public class CostBasedAutoScalerTest
SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
when(spec.getId()).thenReturn("test-supervisor");
+ when(spec.getDataSources()).thenReturn(List.of("test-datasource"));
when(spec.isSuspended()).thenReturn(false);
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("test-stream");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]