This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cf5da7b29e Harmonize metrics for lag and cost based autoscalers. 
(#19097)
2cf5da7b29e is described below

commit 2cf5da7b29e8dd3ba6dc5dde499fb8ff78f31d63
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Mar 6 12:18:19 2026 -0800

    Harmonize metrics for lag and cost based autoscalers. (#19097)
    
    This patch adjusts the autoscalers to emit metrics with supervisorId,
    dataSource, and stream. Previously, the lag-based autoscaler emitted
    only dataSource (which was actually the supervisor id) and stream, and
    the cost-based autoscaler emitted only supervisorId and stream.
---
 docs/operations/metrics.md                         |  7 +++-
 .../supervisor/autoscaler/CostBasedAutoScaler.java | 22 ++++++++---
 .../supervisor/autoscaler/LagBasedAutoScaler.java  | 45 ++++++++++++++--------
 .../autoscaler/LagBasedAutoScalerConfig.java       |  7 +++-
 .../SeekableStreamSupervisorSpecTest.java          | 18 ++-------
 .../autoscaler/CostBasedAutoScalerMockTest.java    |  1 +
 .../autoscaler/CostBasedAutoScalerTest.java        |  4 ++
 7 files changed, 63 insertions(+), 41 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 4e422696fc8..6b2da315b30 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -310,8 +310,11 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor.|`supervisorId`, `dataSource`, `tags`| < 1s |
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
 |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of 
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
coordinator cycle time.|
-|`task/autoScaler/requiredCount`|Count of required tasks based on the 
calculations of `lagBased` auto scaler.|`supervisorId`, `dataSource`, `stream`, 
`scalingSkipReason`|Depends on auto scaler config.|
-|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the 
scale action.|`supervisorId`, `dataSource`, `stream`|Depends on auto scaler 
config.|
+|`task/autoScaler/requiredCount`|Count of required tasks based on the 
calculations of the auto scaler.|`supervisorId`, `dataSource`, `stream`, 
`scalingSkipReason`|Depends on auto scaler config.|
+|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the 
scale action.|`supervisorId`, `dataSource`, `stream`, `tags`|Depends on auto 
scaler config.|
+|`task/autoScaler/costBased/optimalTaskCount`|Optimal task count computed by 
the cost-based auto scaler.|`supervisorId`, `dataSource`, `stream`|Depends on 
auto scaler config.|
+|`task/autoScaler/costBased/lagCost`|Lag cost component of the cost-based auto 
scaler's cost function.|`supervisorId`, `dataSource`, `stream`|Depends on auto 
scaler config.|
+|`task/autoScaler/costBased/idleCost`|Idle cost component of the cost-based 
auto scaler's cost function.|`supervisorId`, `dataSource`, `stream`|Depends on 
auto scaler config.|
 
 If the JVM does not support CPU time measurement for the current thread, 
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 6351c00f6d8..9c42a6f35c9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -21,6 +21,7 @@ package 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
 import it.unimi.dsi.fastutil.ints.IntArraySet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -35,6 +36,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.utils.CollectionUtils;
 
 import javax.annotation.Nullable;
 import java.util.Map;
@@ -99,12 +101,20 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     this.costFunction = new WeightedCostFunction();
     this.autoscalerExecutor = 
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
                                                             + 
StringUtils.encodeForFormat(spec.getId()));
-    this.metricBuilder = ServiceMetricEvent.builder()
-                                           
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
-                                           .setDimension(
-                                               DruidMetrics.STREAM,
-                                               
this.supervisor.getIoConfig().getStream()
-                                           );
+    this.metricBuilder =
+        ServiceMetricEvent.builder()
+                          .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
+                          .setDimension(
+                              DruidMetrics.DATASOURCE,
+                              CollectionUtils.getOnlyElement(
+                                  spec.getDataSources(),
+                                  xs -> DruidException.defensive("Expected one 
dataSource, got[%s]", xs)
+                              )
+                          )
+                          .setDimension(
+                              DruidMetrics.STREAM,
+                              this.supervisor.getIoConfig().getStream()
+                          );
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 142193ae63f..bf35576fbd8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -31,6 +32,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.utils.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -42,7 +44,6 @@ import java.util.concurrent.locks.ReentrantLock;
 public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
 {
   private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
-  private final String dataSource;
   private final CircularFifoQueue<Long> lagMetricsQueue;
   private final ScheduledExecutorService lagComputationExec;
   private final ScheduledExecutorService allocationExec;
@@ -56,15 +57,17 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
   public LagBasedAutoScaler(
       SeekableStreamSupervisor supervisor,
-      String dataSource,
       LagBasedAutoScalerConfig autoScalerConfig,
       SupervisorSpec spec,
       ServiceEmitter emitter
   )
   {
     this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String dataSource = CollectionUtils.getOnlyElement(
+        spec.getDataSources(),
+        xs -> DruidException.defensive("Expected one dataSource, got[%s]", xs)
+    );
     final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
-    this.dataSource = dataSource;
     final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
         .getLagCollectionIntervalMillis()) + 1;
     this.lagMetricsQueue = new CircularFifoQueue<>(slots);
@@ -74,6 +77,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     this.supervisor = supervisor;
     this.emitter = emitter;
     metricBuilder = ServiceMetricEvent.builder()
+                                      
.setDimension(DruidMetrics.SUPERVISOR_ID, spec.getId())
                                       .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
                                       .setDimension(DruidMetrics.STREAM, 
this.supervisor.getIoConfig().getStream());
   }
@@ -88,7 +92,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
       }
       catch (Exception ex) {
-        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+        log.warn(ex, "Exception while computing desired task count for 
supervisor[%s]", spec.getId());
       }
       finally {
         LOCK.unlock();
@@ -102,7 +106,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         lagMetricsQueue.clear();
       }
       catch (Exception ex) {
-        log.warn(ex, "Exception while clearing lags for [%s]", dataSource);
+        log.warn(ex, "Exception while clearing lags for supervisor[%s]", 
spec.getId());
       }
       finally {
         LOCK.unlock();
@@ -123,11 +127,11 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         TimeUnit.MILLISECONDS
     );
     log.info(
-        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
up to [%d] data points for the last [%d] millis for dataSource [%s]",
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
up to [%d] data points for the last [%d] millis for supervisor[%s]",
         lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
         lagMetricsQueue.maxSize(),
         lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
-        dataSource
+        spec.getId()
     );
   }
 
@@ -178,9 +182,9 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
           } else {
             lagMetricsQueue.offer(0L);
           }
-          log.debug("Current lags for dataSource[%s] are [%s].", dataSource, 
lagMetricsQueue);
+          log.debug("Current lags for supervisor[%s] are [%s].", spec.getId(), 
lagMetricsQueue);
         } else {
-          log.debug("Supervisor[%s] is suspended, skipping lag collection", 
dataSource);
+          log.debug("Supervisor[%s] is suspended, skipping lag collection", 
spec.getId());
         }
       }
       catch (Exception e) {
@@ -212,7 +216,11 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
   {
     // if the supervisor is not suspended, ensure required tasks are running
     // if suspended, ensure tasks have been requested to gracefully stop
-    log.debug("Computing the desired task count for [%s], based on following 
lags : [%s]", dataSource, lags);
+    log.debug(
+        "Computing the desired task count for supervisor[%s], based on 
following lags : [%s]",
+        spec.getId(),
+        lags
+    );
     int beyond = 0;
     int within = 0;
     int metricsCount = lags.size();
@@ -227,15 +235,15 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     double beyondProportion = beyond * 1.0 / metricsCount;
     double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
-        withinProportion, dataSource
+    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for supervisor[%s].", beyondProportion,
+        withinProportion, spec.getId()
     );
 
     int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
     int desiredActiveTaskCount;
     int partitionCount = supervisor.getPartitionCount();
     if (partitionCount <= 0) {
-      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", 
spec.getId());
       return -1;
     }
 
@@ -245,8 +253,9 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
       int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
       if (currentActiveTaskCount == actualTaskCountMax) {
-        log.debug("CurrentActiveTaskCount reached task count Max limit, 
skipping scale out action for dataSource [%s].",
-            dataSource
+        log.debug(
+            "CurrentActiveTaskCount reached task count Max limit, skipping 
scale out action for supervisor[%s].",
+            spec.getId()
         );
         emitter.emit(metricBuilder
                          .setDimension(
@@ -266,8 +275,10 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
       int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
       int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
       if (currentActiveTaskCount == actualTaskCountMin) {
-        log.debug("CurrentActiveTaskCount reached task count Min limit, 
skipping scale in action for dataSource[%s].",
-            dataSource
+        log.debug(
+            "CurrentActiveTaskCount reached task count Min limit[%d], skipping 
scale in action for supervisor[%s].",
+            actualTaskCountMin,
+            spec.getId()
         );
         emitter.emit(metricBuilder
                          .setDimension(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index ad036dd0e10..c53ac0e379c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -186,7 +186,12 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
   @Override
   public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec, ServiceEmitter emitter)
   {
-    return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, 
spec.getId(), this, spec, emitter);
+    return new LagBasedAutoScaler(
+        (SeekableStreamSupervisor) supervisor,
+        this,
+        spec,
+        emitter
+    );
   }
 
   @JsonProperty
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index d2660937f8a..baa5ca24050 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -79,6 +79,8 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
   {
     ingestionSchema = 
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
     dataSchema = EasyMock.mock(DataSchema.class);
+    
EasyMock.expect(dataSchema.getDataSource()).andReturn(DATASOURCE).anyTimes();
+    EasyMock.replay(dataSchema);
     seekableStreamSupervisorTuningConfig = 
EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
     seekableStreamSupervisorIOConfig = 
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
     supervisorConfig = new SupervisorStateManagerConfig();
@@ -89,6 +91,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
 
     
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes();
+    
EasyMock.expect(spec.getDataSources()).andReturn(ImmutableList.of(DATASOURCE)).anyTimes();
   }
 
   @Test
@@ -416,7 +419,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(10),
             LagBasedAutoScalerConfig.class
@@ -479,7 +481,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(2),
             LagBasedAutoScalerConfig.class
@@ -534,7 +535,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(2),
             LagBasedAutoScalerConfig.class
@@ -580,7 +580,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(2);
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(3),
             LagBasedAutoScalerConfig.class
@@ -628,7 +627,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(3);
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleInProperties(),
             LagBasedAutoScalerConfig.class
@@ -686,7 +684,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             modifiedScaleInProps,
             LagBasedAutoScalerConfig.class
@@ -749,7 +746,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleInProperties(),
             LagBasedAutoScalerConfig.class
@@ -861,9 +857,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
     
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
     EasyMock.replay(ingestionSchema);
-    EasyMock.expect(dataSchema.getDataSource()).andReturn(DATASOURCE);
-    EasyMock.replay(dataSchema);
-
     spec = new SeekableStreamSupervisorSpec(
         SUPERVISOR,
         ingestionSchema,
@@ -1208,7 +1201,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(2),
             LagBasedAutoScalerConfig.class
@@ -1274,7 +1266,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(3);
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(2),
             LagBasedAutoScalerConfig.class
@@ -1328,7 +1319,6 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
-        DATASOURCE,
         mapper.convertValue(
             getScaleOutProperties(2),
             LagBasedAutoScalerConfig.class
@@ -1446,11 +1436,9 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
   private void mockIngestionSchema()
   {
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
-    
EasyMock.expect(dataSchema.getDataSource()).andReturn(DATASOURCE).anyTimes();
     
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
     
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
     EasyMock.replay(ingestionSchema);
-    EasyMock.replay(dataSchema);
   }
 
   private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean 
scaleOut)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 218b429183b..45c1c2b772e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -58,6 +58,7 @@ public class CostBasedAutoScalerMockTest
     mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
     when(mockSpec.getId()).thenReturn(SUPERVISOR_ID);
+    
when(mockSpec.getDataSources()).thenReturn(java.util.List.of("test-datasource"));
     when(mockSpec.isSuspended()).thenReturn(false);
     when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
     when(mockIoConfig.getStream()).thenReturn(STREAM_NAME);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 1fcb78e450e..86504458b75 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -34,6 +34,7 @@ import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
@@ -56,6 +57,7 @@ public class CostBasedAutoScalerTest
     SeekableStreamSupervisorIOConfig mockIoConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
     when(mockSupervisorSpec.getId()).thenReturn("test-supervisor");
+    
when(mockSupervisorSpec.getDataSources()).thenReturn(List.of("test-datasource"));
     when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
     when(mockIoConfig.getStream()).thenReturn("test-stream");
 
@@ -433,6 +435,7 @@ public class CostBasedAutoScalerTest
     SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
     when(spec.getId()).thenReturn("s-up");
+    when(spec.getDataSources()).thenReturn(List.of("test-datasource"));
     when(supervisor.getIoConfig()).thenReturn(ioConfig);
     when(ioConfig.getStream()).thenReturn("stream");
 
@@ -477,6 +480,7 @@ public class CostBasedAutoScalerTest
     SeekableStreamSupervisorIOConfig ioConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
     when(spec.getId()).thenReturn("test-supervisor");
+    when(spec.getDataSources()).thenReturn(List.of("test-datasource"));
     when(spec.isSuspended()).thenReturn(false);
     when(supervisor.getIoConfig()).thenReturn(ioConfig);
     when(ioConfig.getStream()).thenReturn("test-stream");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to