This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 194d7ffd883 Add metric task/autoScaler/scaleActionTime to task 
autoscaler (#17971)
194d7ffd883 is described below

commit 194d7ffd8837d0d6f806348d3bfe849ecaa1fa55
Author: jtuglu-netflix <[email protected]>
AuthorDate: Thu May 1 23:08:35 2025 -0700

    Add metric task/autoScaler/scaleActionTime to task autoscaler (#17971)
    
    Emit a metric `task/autoScaler/scaleActionTime` which measures in ms how 
long the scale action took.
    This is useful for tuning the minimum time between scaling actions.
---
 docs/operations/metrics.md                              |  1 +
 .../supervisor/SeekableStreamSupervisor.java            | 17 +++++++++++++++--
 .../SeekableStreamSupervisorSpecTest.java               | 11 +++++++++--
 3 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index af59687e10d..936a6e572f3 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -284,6 +284,7 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
 |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of 
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
coordinator cycle time.|
 |`task/autoScaler/requiredCount`|Count of required tasks based on the 
calculations of `lagBased` auto scaler.|`dataSource`, `stream`, 
`scalingSkipReason`|Depends on auto scaler config.|
+|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the 
scale action.|`dataSource`, `stream`|Depends on auto scaler config.|
 
 If the JVM does not support CPU time measurement for the current thread, 
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index daaf2529cc4..e7b29fa23d4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -153,6 +153,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
   public static final String AUTOSCALER_SKIP_REASON_DIMENSION = 
"scalingSkipReason";
   public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = 
"task/autoScaler/requiredCount";
+  public static final String AUTOSCALER_SCALING_TIME_METRIC = 
"task/autoScaler/scaleActionTime";
 
   private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
   private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@@ -543,9 +544,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           desiredActiveTaskCount,
           dataSource
       );
+      final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
       gracefulShutdownInternal();
       changeTaskCountInIOConfig(desiredActiveTaskCount);
       clearAllocationInfo();
+      emitter.emit(ServiceMetricEvent.builder()
+                                     .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                                     .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                                     .setDimensionIfNotNull(
+                                         DruidMetrics.TAGS,
+                                         
spec.getContextValue(DruidMetrics.TAGS)
+                                     )
+                                     .setMetric(
+                                         AUTOSCALER_SCALING_TIME_METRIC,
+                                         scaleActionStopwatch.millisElapsed()
+                                     ));
       log.info("Changed taskCount to [%s] for dataSource [%s].", 
desiredActiveTaskCount, dataSource);
       return true;
     }
@@ -4500,7 +4513,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       emitter.emit(
           ServiceMetricEvent.builder()
                             .setDimension("noticeType", noticeType)
-                            .setDimension("dataSource", dataSource)
+                            .setDimension(DruidMetrics.DATASOURCE, dataSource)
                             .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS))
                             .setMetric("ingest/notices/time", timeInMillis)
       );
@@ -4534,7 +4547,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     try {
       emitter.emit(
           ServiceMetricEvent.builder()
-                            .setDimension("dataSource", dataSource)
+                            .setDimension(DruidMetrics.DATASOURCE, dataSource)
                             .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS))
                             .setMetric("ingest/notices/queueSize", 
getNoticesQueueSize())
       );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 1a704729daf..2d0dc315281 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -90,7 +90,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
   private TaskStorage taskStorage;
   private TaskMaster taskMaster;
   private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
-  private ServiceEmitter emitter;
+  private StubServiceEmitter emitter;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
   private DataSchema dataSchema;
   private SeekableStreamSupervisorTuningConfig 
seekableStreamSupervisorTuningConfig;
@@ -116,7 +116,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     taskStorage = EasyMock.mock(TaskStorage.class);
     taskMaster = EasyMock.mock(TaskMaster.class);
     indexerMetadataStorageCoordinator = 
EasyMock.mock(IndexerMetadataStorageCoordinator.class);
-    emitter = EasyMock.mock(ServiceEmitter.class);
+    emitter = new StubServiceEmitter();
     rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
     dataSchema = EasyMock.mock(DataSchema.class);
     seekableStreamSupervisorTuningConfig = 
EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
@@ -788,6 +788,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
             .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed 
yet"::equals));
+    
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
     autoScaler.reset();
     autoScaler.stop();
   }
@@ -846,6 +847,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
             .anyMatch("Already at max task count"::equals));
+    
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
 
     autoScaler.reset();
     autoScaler.stop();
@@ -940,6 +942,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScaleOut);
+    
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
 
     autoScaler.reset();
     autoScaler.stop();
@@ -988,6 +991,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountAfterScaleOut);
+    
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
 
     autoScaler.reset();
     autoScaler.stop();
@@ -1041,6 +1045,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Thread.sleep(2000);
     Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
 
+    
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
+
     autoScaler.reset();
     autoScaler.stop();
   }
@@ -1099,6 +1105,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
             .anyMatch("Already at min task count"::equals));
+    
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
 
     autoScaler.reset();
     autoScaler.stop();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to