This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a65b2d4f41e Visibility into LagBased AutoScaler desired task count
(#16199)
a65b2d4f41e is described below
commit a65b2d4f41e495a3f032cf3f96dc01005d9d2025
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Mar 27 12:08:00 2024 -0500
Visibility into LagBased AutoScaler desired task count (#16199)
* Visibility into skipped scale notices
* comments
* change to emit always instead of just skips
* fix failing test
* comments
* Add couple more tests
---
docs/operations/metrics.md | 1 +
.../supervisor/SeekableStreamSupervisor.java | 36 ++++-
.../supervisor/SeekableStreamSupervisorSpec.java | 2 +-
.../supervisor/autoscaler/AutoScalerConfig.java | 3 +-
.../supervisor/autoscaler/LagBasedAutoScaler.java | 31 ++++-
.../autoscaler/LagBasedAutoScalerConfig.java | 5 +-
.../SeekableStreamSupervisorSpecTest.java | 154 +++++++++++++++++++--
7 files changed, 210 insertions(+), 22 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 0c7a492d807..e49b9600181 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -258,6 +258,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
coordinator cycle time.|
+|`task/autoScaler/requiredCount`|Count of required tasks based on the
calculations of `lagBased` auto scaler.|`dataSource`, `stream`,
`scalingSkipReason`|Depends on auto scaler config.|
If the JVM does not support CPU time measurement for the current thread,
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 507001a7af6..59cd05bf349 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -149,6 +149,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
implements Supervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
+ public static final String AUTOSCALER_SKIP_REASON_DIMENSION =
"scalingSkipReason";
+ public static final String AUTOSCALER_REQUIRED_TASKS_METRIC =
"task/autoScaler/requiredCount";
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@@ -403,11 +405,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
+ ServiceEmitter emitter;
private static final String TYPE = "dynamic_allocation_tasks_notice";
- DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter
emitter)
{
this.scaleAction = scaleAction;
+ this.emitter = emitter;
}
/**
@@ -448,17 +452,35 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
}
+ final Integer desiredTaskCount = scaleAction.call();
+ ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
- "DynamicAllocationTasksNotice submitted again in [%d] millis,
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+ "DynamicAllocationTasksNotice submitted again in [%d] millis,
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired
task count is [%s], active task count is [%s]",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
- dataSource
+ dataSource,
+ desiredTaskCount,
+ getActiveTaskGroupsCount()
);
+
+ if (desiredTaskCount > 0) {
+ emitter.emit(event.setDimension(
+ AUTOSCALER_SKIP_REASON_DIMENSION,
+ "minTriggerScaleActionFrequencyMillis not
elapsed yet"
+ )
+ .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
+ }
return;
}
- final Integer desriedTaskCount = scaleAction.call();
- boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+
+ if (desiredTaskCount > 0) {
+ emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
+ }
+
+ boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
dynamicTriggerLastRunTime = nowTime;
}
@@ -1208,9 +1230,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
- public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+ public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction,
ServiceEmitter emitter)
{
- return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction));
+ return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction,
emitter));
}
private Runnable buildRunTask()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index b7d4ba2f277..7b5f46195e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -167,7 +167,7 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
{
AutoScalerConfig autoScalerConfig =
ingestionSchema.getIOConfig().getAutoScalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()
&& supervisor instanceof SeekableStreamSupervisor) {
- return autoScalerConfig.createAutoScaler(supervisor, this);
+ return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
}
return new NoopTaskAutoScaler();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
index 53174a17bba..750e77328f6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -26,6 +26,7 @@ import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy",
defaultImpl = LagBasedAutoScalerConfig.class)
@@ -38,6 +39,6 @@ public interface AutoScalerConfig
long getMinTriggerScaleActionFrequencyMillis();
int getTaskCountMax();
int getTaskCountMin();
- SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec);
+ SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec, ServiceEmitter emitter);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index a06f358435f..8235f53e33f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -27,6 +27,9 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
import java.util.ArrayList;
import java.util.List;
@@ -45,11 +48,17 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
private final SupervisorSpec spec;
private final SeekableStreamSupervisor supervisor;
private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+ private final ServiceEmitter emitter;
+ private final ServiceMetricEvent.Builder metricBuilder;
private static final ReentrantLock LOCK = new ReentrantLock(true);
- public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String
dataSource,
- LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+ public LagBasedAutoScaler(
+ SeekableStreamSupervisor supervisor,
+ String dataSource,
+ LagBasedAutoScalerConfig autoScalerConfig,
+ SupervisorSpec spec,
+ ServiceEmitter emitter
)
{
this.lagBasedAutoScalerConfig = autoScalerConfig;
@@ -62,6 +71,10 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
this.lagComputationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Computation-%d");
this.spec = spec;
this.supervisor = supervisor;
+ this.emitter = emitter;
+ metricBuilder = ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
this.supervisor.getIoConfig().getStream());
}
@Override
@@ -93,7 +106,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
- supervisor.buildDynamicAllocationTask(scaleAction),
+ supervisor.buildDynamicAllocationTask(scaleAction, emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() +
lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
@@ -214,6 +227,12 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
log.warn("CurrentActiveTaskCount reached task count Max limit,
skipping scale out action for dataSource [%s].",
dataSource
);
+ emitter.emit(metricBuilder
+ .setDimension(
+
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+ "Already at max task count"
+ )
+
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
@@ -228,6 +247,12 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
log.warn("CurrentActiveTaskCount reached task count Min limit,
skipping scale in action for dataSource [%s].",
dataSource
);
+ emitter.emit(metricBuilder
+ .setDimension(
+
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+ "Already at min task count"
+ )
+
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount,
lagBasedAutoScalerConfig.getTaskCountMin());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index 0a4eb0b585e..e03242de279 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -25,6 +25,7 @@ import
org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
@@ -154,9 +155,9 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
}
@Override
- public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec)
+ public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec, ServiceEmitter emitter)
{
- return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor,
spec.getId(), this, spec);
+ return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor,
spec.getId(), this, spec, emitter);
}
@JsonProperty
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 12eb3c7e10a..3e0e46d7a03 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -57,6 +57,8 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@@ -129,6 +131,8 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class);
supervisorStateManagerConfig =
EasyMock.mock(SupervisorStateManagerConfig.class);
supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
+
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes();
}
private abstract class BaseTestSeekableStreamSupervisor extends
SeekableStreamSupervisor<String, String, ByteEntity>
@@ -618,9 +622,11 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class))
.anyTimes();
+
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.replay(supervisor4);
TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(
@@ -691,8 +697,10 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
"1"
), AutoScalerConfig.class))
.anyTimes();
+
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
+
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
@@ -750,16 +758,19 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
- TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(3);
+ StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
- getScaleOutProperties(2),
+ getScaleOutProperties(10),
LagBasedAutoScalerConfig.class
),
- spec
+ spec,
+ dynamicActionEmitter
);
supervisor.start();
autoScaler.start();
@@ -769,6 +780,72 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
+ Assert.assertTrue(
+ dynamicActionEmitter
+ .getMetricEvents()
+ .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+ .stream()
+ .map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+ .filter(Objects::nonNull)
+ .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed
yet"::equals));
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ @Test
+ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax()
throws InterruptedException
+ {
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10)
+ {
+ @Override
+ public int getActiveTaskGroupsCount()
+ {
+ return 2;
+ }
+ };
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ dynamicActionEmitter
+ );
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ Thread.sleep(1000);
+
+ Assert.assertTrue(
+ dynamicActionEmitter
+ .getMetricEvents()
+ .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+ .stream()
+ .map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+ .filter(Objects::nonNull)
+ .anyMatch("Already at max task count"::equals));
autoScaler.reset();
autoScaler.stop();
@@ -807,7 +884,8 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
),
- spec
+ spec,
+ emitter
);
supervisor.start();
autoScaler.start();
@@ -851,7 +929,8 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
getScaleOutProperties(3),
LagBasedAutoScalerConfig.class
),
- spec
+ spec,
+ emitter
);
supervisor.start();
autoScaler.start();
@@ -870,7 +949,6 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
public void testSeekableStreamSupervisorSpecWithScaleIn() throws
InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
-
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2,
false)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
@@ -895,7 +973,8 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
getScaleInProperties(),
LagBasedAutoScalerConfig.class
),
- spec
+ spec,
+ emitter
);
// enable autoscaler so that taskcount config will be ignored and init
value of taskCount will use taskCountMin.
@@ -914,6 +993,65 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
autoScaler.stop();
}
+ @Test
+ public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws
InterruptedException
+ {
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10)
+ {
+ @Override
+ public int getActiveTaskGroupsCount()
+ {
+ return 1;
+ }
+ };
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleInProperties(),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ dynamicActionEmitter
+ );
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ Thread.sleep(1000);
+
+ Assert.assertTrue(
+ dynamicActionEmitter
+ .getMetricEvents()
+ .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+ .stream()
+ .map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+ .filter(Objects::nonNull)
+ .anyMatch("Already at min task count"::equals));
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
@Test
public void testSeekableStreamSupervisorSpecWithScaleDisable() throws
InterruptedException
{
@@ -1185,7 +1323,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
- autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 50);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 0);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]