This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 04c7606f494 Record task auto-scaling skip event when publishing task
set is non-empty (#18536)
04c7606f494 is described below
commit 04c7606f494aa0777fe5e6c6207fe5b42720cfe6
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Sep 16 10:17:54 2025 -0700
Record task auto-scaling skip event when publishing task set is non-empty
(#18536)
This emits a metric when auto-scaling would've occurred, but there was ≥ 1
PUBLISHING task group that forced the auto-scaler to bail. This can help with
tuning the stopTaskCount and stopTaskCountRatio config fields.
---
.../supervisor/SeekableStreamSupervisor.java | 17 ++++--
.../SeekableStreamSupervisorSpecTest.java | 66 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 5 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0a37f54cd04..312a6480e25 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -472,6 +472,11 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
supervisorId,
dataSource
);
+ final Integer desiredTaskCount = computeDesiredTaskCount.call();
+ ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
+
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
if (!list.isEmpty()) {
log.info(
@@ -480,14 +485,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
dataSource,
list
);
+ if (desiredTaskCount > 0) {
+ emitter.emit(event.setDimension(
+ AUTOSCALER_SKIP_REASON_DIMENSION,
+ "There are tasks pending completion"
+ )
+ .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
+ }
return;
}
}
- final Integer desiredTaskCount = computeDesiredTaskCount.call();
- ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
-
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
-
.setDimension(DruidMetrics.DATASOURCE, dataSource)
- .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis,
minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s],
skipping it! desired task count is [%s], active task count is [%s]",
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index f0ba8966626..e96e3887318 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -82,6 +82,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -851,6 +852,71 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
autoScaler.stop();
}
+ @Test
+ public void
test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing()
throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ dynamicActionEmitter
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ 0,
+ ImmutableMap.of("0", "0"),
+ null,
+ null,
+ Set.of("dummyTask"),
+ Collections.emptySet()
+ );
+
+ supervisor.start();
+ autoScaler.start();
+
+ supervisor.runInternal();
+ Thread.sleep(1000); // ensure a dynamic allocation notice completes
+
+ Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertTrue(
+ dynamicActionEmitter
+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+ .stream()
+ .map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+ .filter(Objects::nonNull)
+ .anyMatch("There are tasks pending completion"::equals)
+ );
+
+
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
@Test
public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor()
throws InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]