This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 194d7ffd883 Add metric task/autoScaler/scaleActionTime to task
autoscaler (#17971)
194d7ffd883 is described below
commit 194d7ffd8837d0d6f806348d3bfe849ecaa1fa55
Author: jtuglu-netflix <[email protected]>
AuthorDate: Thu May 1 23:08:35 2025 -0700
Add metric task/autoScaler/scaleActionTime to task autoscaler (#17971)
Emit a metric `task/autoScaler/scaleActionTime` which measures in ms how
long the scale action took.
This is useful for tuning the minimum time between scaling actions.
---
docs/operations/metrics.md | 1 +
.../supervisor/SeekableStreamSupervisor.java | 17 +++++++++++++++--
.../SeekableStreamSupervisorSpecTest.java | 11 +++++++++--
3 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index af59687e10d..936a6e572f3 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -284,6 +284,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
coordinator cycle time.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the
calculations of `lagBased` auto scaler.|`dataSource`, `stream`,
`scalingSkipReason`|Depends on auto scaler config.|
+|`task/autoScaler/scaleActionTime`|Time taken in milliseconds to complete the
scale action.|`dataSource`, `stream`|Depends on auto scaler config.|
If the JVM does not support CPU time measurement for the current thread,
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index daaf2529cc4..e7b29fa23d4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -153,6 +153,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
public static final String AUTOSCALER_SKIP_REASON_DIMENSION =
"scalingSkipReason";
public static final String AUTOSCALER_REQUIRED_TASKS_METRIC =
"task/autoScaler/requiredCount";
+ public static final String AUTOSCALER_SCALING_TIME_METRIC =
"task/autoScaler/scaleActionTime";
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@@ -543,9 +544,21 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
desiredActiveTaskCount,
dataSource
);
+ final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
gracefulShutdownInternal();
changeTaskCountInIOConfig(desiredActiveTaskCount);
clearAllocationInfo();
+ emitter.emit(ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
+ .setDimensionIfNotNull(
+ DruidMetrics.TAGS,
+
spec.getContextValue(DruidMetrics.TAGS)
+ )
+ .setMetric(
+ AUTOSCALER_SCALING_TIME_METRIC,
+ scaleActionStopwatch.millisElapsed()
+ ));
log.info("Changed taskCount to [%s] for dataSource [%s].",
desiredActiveTaskCount, dataSource);
return true;
}
@@ -4500,7 +4513,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("noticeType", noticeType)
- .setDimension("dataSource", dataSource)
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimensionIfNotNull(DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS))
.setMetric("ingest/notices/time", timeInMillis)
);
@@ -4534,7 +4547,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
try {
emitter.emit(
ServiceMetricEvent.builder()
- .setDimension("dataSource", dataSource)
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimensionIfNotNull(DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS))
.setMetric("ingest/notices/queueSize",
getNoticesQueueSize())
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 1a704729daf..2d0dc315281 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -90,7 +90,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
private TaskStorage taskStorage;
private TaskMaster taskMaster;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
- private ServiceEmitter emitter;
+ private StubServiceEmitter emitter;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private DataSchema dataSchema;
private SeekableStreamSupervisorTuningConfig
seekableStreamSupervisorTuningConfig;
@@ -116,7 +116,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
taskStorage = EasyMock.mock(TaskStorage.class);
taskMaster = EasyMock.mock(TaskMaster.class);
indexerMetadataStorageCoordinator =
EasyMock.mock(IndexerMetadataStorageCoordinator.class);
- emitter = EasyMock.mock(ServiceEmitter.class);
+ emitter = new StubServiceEmitter();
rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
dataSchema = EasyMock.mock(DataSchema.class);
seekableStreamSupervisorTuningConfig =
EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
@@ -788,6 +788,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
.anyMatch("minTriggerScaleActionFrequencyMillis not elapsed
yet"::equals));
+
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
autoScaler.reset();
autoScaler.stop();
}
@@ -846,6 +847,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
.anyMatch("Already at max task count"::equals));
+
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
autoScaler.reset();
autoScaler.stop();
@@ -940,6 +942,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
+
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
autoScaler.reset();
autoScaler.stop();
@@ -988,6 +991,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScaleOut);
+
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
autoScaler.reset();
autoScaler.stop();
@@ -1041,6 +1045,8 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Thread.sleep(2000);
Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
+
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
+
autoScaler.reset();
autoScaler.stop();
}
@@ -1099,6 +1105,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
.anyMatch("Already at min task count"::equals));
+
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
autoScaler.reset();
autoScaler.stop();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]