This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a65b2d4f41e Visibility into LagBased AutoScaler desired task count 
(#16199)
a65b2d4f41e is described below

commit a65b2d4f41e495a3f032cf3f96dc01005d9d2025
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Mar 27 12:08:00 2024 -0500

    Visibility into LagBased AutoScaler desired task count (#16199)
    
    * Visibility into skipped scale notices
    
    * comments
    
    * change to emit always instead of just skips
    
    * fix failing test
    
    * comments
    
    * Add couple more tests
---
 docs/operations/metrics.md                         |   1 +
 .../supervisor/SeekableStreamSupervisor.java       |  36 ++++-
 .../supervisor/SeekableStreamSupervisorSpec.java   |   2 +-
 .../supervisor/autoscaler/AutoScalerConfig.java    |   3 +-
 .../supervisor/autoscaler/LagBasedAutoScaler.java  |  31 ++++-
 .../autoscaler/LagBasedAutoScalerConfig.java       |   5 +-
 .../SeekableStreamSupervisorSpecTest.java          | 154 +++++++++++++++++++--
 7 files changed, 210 insertions(+), 22 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 0c7a492d807..e49b9600181 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -258,6 +258,7 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor.|`dataSource`, `tags`| < 1s |
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
 |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of 
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
coordinator cycle time.|
+|`task/autoScaler/requiredCount`|Count of required tasks based on the 
calculations of `lagBased` auto scaler.|`dataSource`, `stream`, 
`scalingSkipReason`|Depends on auto scaler config.|
 
 If the JVM does not support CPU time measurement for the current thread, 
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 507001a7af6..59cd05bf349 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -149,6 +149,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     implements Supervisor
 {
   public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
+  public static final String AUTOSCALER_SKIP_REASON_DIMENSION = 
"scalingSkipReason";
+  public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = 
"task/autoScaler/requiredCount";
 
   private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
   private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@@ -403,11 +405,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private class DynamicAllocationTasksNotice implements Notice
   {
     Callable<Integer> scaleAction;
+    ServiceEmitter emitter;
     private static final String TYPE = "dynamic_allocation_tasks_notice";
 
-    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter 
emitter)
     {
       this.scaleAction = scaleAction;
+      this.emitter = emitter;
     }
 
     /**
@@ -448,17 +452,35 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               return;
             }
           }
+          final Integer desiredTaskCount = scaleAction.call();
+          ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
+              .setDimension(DruidMetrics.DATASOURCE, dataSource)
+              .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
           if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
             log.info(
-                "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired 
task count is [%s], active task count is [%s]",
                 nowTime - dynamicTriggerLastRunTime,
                 autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
-                dataSource
+                dataSource,
+                desiredTaskCount,
+                getActiveTaskGroupsCount()
             );
+
+            if (desiredTaskCount > 0) {
+              emitter.emit(event.setDimension(
+                                    AUTOSCALER_SKIP_REASON_DIMENSION,
+                                    "minTriggerScaleActionFrequencyMillis not 
elapsed yet"
+                                )
+                                .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
+            }
             return;
           }
-          final Integer desriedTaskCount = scaleAction.call();
-          boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+
+          if (desiredTaskCount > 0) {
+            emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
+          }
+
+          boolean allocationSuccess = changeTaskCount(desiredTaskCount);
           if (allocationSuccess) {
             dynamicTriggerLastRunTime = nowTime;
           }
@@ -1208,9 +1230,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
-  public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+  public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, 
ServiceEmitter emitter)
   {
-    return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction));
+    return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, 
emitter));
   }
 
   private Runnable buildRunTask()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index b7d4ba2f277..7b5f46195e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -167,7 +167,7 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
   {
     AutoScalerConfig autoScalerConfig = 
ingestionSchema.getIOConfig().getAutoScalerConfig();
     if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() 
&& supervisor instanceof SeekableStreamSupervisor) {
-      return autoScalerConfig.createAutoScaler(supervisor, this);
+      return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
     }
     return new NoopTaskAutoScaler();
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
index 53174a17bba..750e77328f6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -26,6 +26,7 @@ import org.apache.druid.guice.annotations.UnstableApi;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 @UnstableApi
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", 
defaultImpl = LagBasedAutoScalerConfig.class)
@@ -38,6 +39,6 @@ public interface AutoScalerConfig
   long getMinTriggerScaleActionFrequencyMillis();
   int getTaskCountMax();
   int getTaskCountMin();
-  SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec);
+  SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec, ServiceEmitter emitter);
 }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index a06f358435f..8235f53e33f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -27,6 +27,9 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -45,11 +48,17 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final SupervisorSpec spec;
   private final SeekableStreamSupervisor supervisor;
   private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
 
   private static final ReentrantLock LOCK = new ReentrantLock(true);
 
-  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
-      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  public LagBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
   )
   {
     this.lagBasedAutoScalerConfig = autoScalerConfig;
@@ -62,6 +71,10 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
     this.spec = spec;
     this.supervisor = supervisor;
+    this.emitter = emitter;
+    metricBuilder = ServiceMetricEvent.builder()
+                                      .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+                                      .setDimension(DruidMetrics.STREAM, 
this.supervisor.getIoConfig().getStream());
   }
 
   @Override
@@ -93,7 +106,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         TimeUnit.MILLISECONDS
     );
     allocationExec.scheduleAtFixedRate(
-        supervisor.buildDynamicAllocationTask(scaleAction),
+        supervisor.buildDynamicAllocationTask(scaleAction, emitter),
         lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
             .getLagCollectionRangeMillis(),
         lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
@@ -214,6 +227,12 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         log.warn("CurrentActiveTaskCount reached task count Max limit, 
skipping scale out action for dataSource [%s].",
             dataSource
         );
+        emitter.emit(metricBuilder
+                         .setDimension(
+                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+                             "Already at max task count"
+                         )
+                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
         return -1;
       } else {
         desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
@@ -228,6 +247,12 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         log.warn("CurrentActiveTaskCount reached task count Min limit, 
skipping scale in action for dataSource [%s].",
             dataSource
         );
+        emitter.emit(metricBuilder
+                         .setDimension(
+                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+                             "Already at min task count"
+                         )
+                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
         return -1;
       } else {
         desiredActiveTaskCount = Math.max(taskCount, 
lagBasedAutoScalerConfig.getTaskCountMin());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index 0a4eb0b585e..e03242de279 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -25,6 +25,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 import javax.annotation.Nullable;
 
@@ -154,9 +155,9 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
   }
 
   @Override
-  public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec)
+  public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec, ServiceEmitter emitter)
   {
-    return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, 
spec.getId(), this, spec);
+    return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, 
spec.getId(), this, spec, emitter);
   }
 
   @JsonProperty
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 12eb3c7e10a..3e0e46d7a03 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -57,6 +57,8 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -129,6 +131,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class);
     supervisorStateManagerConfig = 
EasyMock.mock(SupervisorStateManagerConfig.class);
     supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
+
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes();
   }
 
   private abstract class BaseTestSeekableStreamSupervisor extends 
SeekableStreamSupervisor<String, String, ByteEntity>
@@ -618,9 +622,11 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
             .andReturn(mapper.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
             .anyTimes();
+    
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
 
     
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+    
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
     EasyMock.replay(supervisor4);
 
     TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
@@ -691,8 +697,10 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
                 "1"
             ), AutoScalerConfig.class))
             .anyTimes();
+    
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
 
+    
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
     
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
     EasyMock.replay(supervisor4);
 
@@ -750,16 +758,19 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
     EasyMock.replay(taskMaster);
 
-    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(3);
+    StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
         DATASOURCE,
         mapper.convertValue(
-            getScaleOutProperties(2),
+            getScaleOutProperties(10),
             LagBasedAutoScalerConfig.class
         ),
-        spec
+        spec,
+        dynamicActionEmitter
     );
     supervisor.start();
     autoScaler.start();
@@ -769,6 +780,72 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScaleOut);
+    Assert.assertTrue(
+        dynamicActionEmitter
+            .getMetricEvents()
+            .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            .stream()
+            .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+            .filter(Objects::nonNull)
+            .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed 
yet"::equals));
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
+  @Test
+  public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() 
throws InterruptedException
+  {
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10)
+    {
+      @Override
+      public int getActiveTaskGroupsCount()
+      {
+        return 2;
+      }
+    };
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        dynamicActionEmitter
+    );
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    Thread.sleep(1000);
+
+    Assert.assertTrue(
+        dynamicActionEmitter
+            .getMetricEvents()
+            .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            .stream()
+            .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+            .filter(Objects::nonNull)
+            .anyMatch("Already at max task count"::equals));
 
     autoScaler.reset();
     autoScaler.stop();
@@ -807,7 +884,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             getScaleOutProperties(2),
             LagBasedAutoScalerConfig.class
         ),
-        spec
+        spec,
+        emitter
     );
     supervisor.start();
     autoScaler.start();
@@ -851,7 +929,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             getScaleOutProperties(3),
             LagBasedAutoScalerConfig.class
         ),
-        spec
+        spec,
+        emitter
     );
     supervisor.start();
     autoScaler.start();
@@ -870,7 +949,6 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
   public void testSeekableStreamSupervisorSpecWithScaleIn() throws 
InterruptedException
   {
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
-
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
     EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, 
false)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
@@ -895,7 +973,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             getScaleInProperties(),
             LagBasedAutoScalerConfig.class
         ),
-        spec
+        spec,
+        emitter
     );
 
     // enable autoscaler so that taskcount config will be ignored and init 
value of taskCount will use taskCountMin.
@@ -914,6 +993,65 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     autoScaler.stop();
   }
 
+  @Test
+  public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws 
InterruptedException
+  {
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10)
+    {
+      @Override
+      public int getActiveTaskGroupsCount()
+      {
+        return 1;
+      }
+    };
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleInProperties(),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        dynamicActionEmitter
+    );
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    Thread.sleep(1000);
+
+    Assert.assertTrue(
+        dynamicActionEmitter
+            .getMetricEvents()
+            .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+            .stream()
+            .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+            .filter(Objects::nonNull)
+            .anyMatch("Already at min task count"::equals));
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
   @Test
   public void testSeekableStreamSupervisorSpecWithScaleDisable() throws 
InterruptedException
   {
@@ -1185,7 +1323,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
   {
     HashMap<String, Object> autoScalerConfig = new HashMap<>();
     autoScalerConfig.put("enableTaskAutoScaler", true);
-    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 50);
     autoScalerConfig.put("lagCollectionRangeMillis", 500);
     autoScalerConfig.put("scaleOutThreshold", 0);
     autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to