This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 04c7606f494 Record task auto-scaling skip event when publishing task 
set is non-empty (#18536)
04c7606f494 is described below

commit 04c7606f494aa0777fe5e6c6207fe5b42720cfe6
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Sep 16 10:17:54 2025 -0700

    Record task auto-scaling skip event when publishing task set is non-empty 
(#18536)
    
    This emits a metric when auto-scaling would've occurred, but there was ≥ 1 
PUBLISHING task group that forced the auto-scaler to bail. This can help with 
tuning the stopTaskCount and stopTaskCountRatio config fields.
---
 .../supervisor/SeekableStreamSupervisor.java       | 17 ++++--
 .../SeekableStreamSupervisorSpecTest.java          | 66 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 5 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0a37f54cd04..312a6480e25 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -472,6 +472,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                     supervisorId,
                     dataSource
           );
+          final Integer desiredTaskCount = computeDesiredTaskCount.call();
+          ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
+                                                               
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+                                                               
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                                               
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
           for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
             if (!list.isEmpty()) {
               log.info(
@@ -480,14 +485,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   dataSource,
                   list
               );
+              if (desiredTaskCount > 0) {
+                emitter.emit(event.setDimension(
+                                      AUTOSCALER_SKIP_REASON_DIMENSION,
+                                      "There are tasks pending completion"
+                                  )
+                                  .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
+              }
               return;
             }
           }
-          final Integer desiredTaskCount = computeDesiredTaskCount.call();
-          ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
-                                                               
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
-                                                               
.setDimension(DruidMetrics.DATASOURCE, dataSource)
-              .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
           if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
             log.info(
                 "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], 
skipping it! desired task count is [%s], active task count is [%s]",
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index f0ba8966626..e96e3887318 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -82,6 +82,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -851,6 +852,71 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     autoScaler.stop();
   }
 
+  @Test
+  public void 
test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() 
throws InterruptedException
+  {
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        dynamicActionEmitter
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        0,
+        ImmutableMap.of("0", "0"),
+        null,
+        null,
+        Set.of("dummyTask"),
+        Collections.emptySet()
+    );
+
+    supervisor.start();
+    autoScaler.start();
+
+    supervisor.runInternal();
+    Thread.sleep(1000); // ensure a dynamic allocation notice completes
+
+    Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+    Assert.assertTrue(
+        dynamicActionEmitter
+            
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            .stream()
+            .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+            .filter(Objects::nonNull)
+            .anyMatch("There are tasks pending completion"::equals)
+    );
+
+    
emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
   @Test
   public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() 
throws InterruptedException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to