This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2efb74f fix supervisor auto scaler config serde bug (#12317)
2efb74f is described below
commit 2efb74ff1e4a9921e236a21475978822be57fe11
Author: Parag Jain <[email protected]>
AuthorDate: Thu Mar 10 05:47:12 2022 +0530
fix supervisor auto scaler config serde bug (#12317)
---
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 2 +-
.../supervisor/KafkaSupervisorIOConfigTest.java | 57 +++
.../supervisor/KinesisSupervisorIOConfig.java | 2 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 4 +-
.../supervisor/SeekableStreamSupervisor.java | 2 +-
.../SeekableStreamSupervisorIOConfig.java | 2 +-
.../supervisor/SeekableStreamSupervisorSpec.java | 2 +-
.../SeekableStreamSupervisorSpecTest.java | 465 ++++++++++++---------
8 files changed, 326 insertions(+), 210 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 87b689e..0625ead 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -121,7 +121,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
- ", autoScalerConfig=" + getAutoscalerConfig() +
+ ", autoScalerConfig=" + getAutoScalerConfig() +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5086689..e503d4f 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -19,21 +19,27 @@
package org.apache.druid.indexing.kafka.supervisor;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.hamcrest.CoreMatchers;
import org.joda.time.Duration;
+import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
public class KafkaSupervisorIOConfigTest
@@ -260,4 +266,55 @@ public class KafkaSupervisorIOConfigTest
), KafkaSupervisorIOConfig.class
);
}
+
+ @Test
+ public void testAutoScalingConfigSerde() throws JsonProcessingException
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 0);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+ autoScalerConfig.put("scaleInThreshold", 1000000);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("taskCountMax", 2);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+ final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
+ consumerProperties.put("bootstrap.servers", "localhost:8082");
+
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
+ "test",
+ null,
+ 1,
+ 1,
+ new Period("PT1H"),
+ consumerProperties,
+ mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null
+ );
+ String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 =
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
+ Assert.assertNotNull(kafkaSupervisorIOConfig1.getAutoScalerConfig());
+
Assert.assertTrue(kafkaSupervisorIOConfig1.getAutoScalerConfig().getEnableTaskAutoScaler());
+ Assert.assertEquals(1,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMin());
+ Assert.assertEquals(2,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
+ Assert.assertEquals(
+ 1200000,
+
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
+ );
+ }
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 41ae876..220c22f 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -151,7 +151,7 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
", endpoint='" + endpoint + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
- ", autoScalerConfig=" + getAutoscalerConfig() +
+ ", autoScalerConfig=" + getAutoScalerConfig() +
", taskDuration=" + getTaskDuration() +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 02b5bd2..3bfe6dd 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -531,7 +531,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
);
- AutoScalerConfig autoscalerConfigNull =
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig();
+ AutoScalerConfig autoscalerConfigNull =
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
Assert.assertNull(autoscalerConfigNull);
// create KinesisSupervisorIOConfig with autoScalerConfig Empty
@@ -558,7 +558,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
);
- AutoScalerConfig autoscalerConfig =
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig();
+ AutoScalerConfig autoscalerConfig =
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
Assert.assertNotNull(autoscalerConfig);
Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig);
Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 86579a0..e4445f2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -765,7 +765,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
- this.autoScalerConfig = ioConfig.getAutoscalerConfig();
+ this.autoScalerConfig = ioConfig.getAutoScalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 3ed55ec..6ddb018 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -125,7 +125,7 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable
@JsonProperty
- public AutoScalerConfig getAutoscalerConfig()
+ public AutoScalerConfig getAutoScalerConfig()
{
return autoScalerConfig;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index ff1d317..ac985ac 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -162,7 +162,7 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
- AutoScalerConfig autoScalerConfig =
ingestionSchema.getIOConfig().getAutoscalerConfig();
+ AutoScalerConfig autoScalerConfig =
ingestionSchema.getIOConfig().getAutoScalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()
&& supervisor instanceof SeekableStreamSupervisor) {
return autoScalerConfig.createAutoScaler(supervisor, this);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index e2142a1..7cdbb74 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -133,15 +133,15 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
private BaseTestSeekableStreamSupervisor()
{
super(
- "testSupervisorId",
- taskStorage,
- taskMaster,
- indexerMetadataStorageCoordinator,
- taskClientFactory,
- OBJECT_MAPPER,
- spec,
- rowIngestionMetersFactory,
- false
+ "testSupervisorId",
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ spec,
+ rowIngestionMetersFactory,
+ false
);
}
@@ -154,7 +154,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Override
protected void updatePartitionLagFromStream()
{
- // do nothing
+ // do nothing
}
@Nullable
@@ -173,25 +173,25 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Override
protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
- int groupId,
- Map<String, String> startPartitions,
- Map<String, String> endPartitions,
- String baseSequenceName,
- DateTime minimumMessageTime,
- DateTime maximumMessageTime,
- Set<String> exclusiveStartSequenceNumberPartitions,
- SeekableStreamSupervisorIOConfig ioConfig
+ int groupId,
+ Map<String, String> startPartitions,
+ Map<String, String> endPartitions,
+ String baseSequenceName,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ Set<String> exclusiveStartSequenceNumberPartitions,
+ SeekableStreamSupervisorIOConfig ioConfig
)
{
return new SeekableStreamIndexTaskIOConfig<String, String>(
- groupId,
- baseSequenceName,
- new SeekableStreamStartSequenceNumbers<>(STREAM,
startPartitions, exclusiveStartSequenceNumberPartitions),
- new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
- true,
- minimumMessageTime,
- maximumMessageTime,
- ioConfig.getInputFormat()
+ groupId,
+ baseSequenceName,
+ new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions,
exclusiveStartSequenceNumberPartitions),
+ new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+ true,
+ minimumMessageTime,
+ maximumMessageTime,
+ ioConfig.getInputFormat()
)
{
};
@@ -199,13 +199,13 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Override
protected List<SeekableStreamIndexTask<String, String, ByteEntity>>
createIndexTasks(
- int replicas,
- String baseSequenceName,
- ObjectMapper sortingMapper,
- TreeMap<Integer, Map<String, String>> sequenceOffsets,
- SeekableStreamIndexTaskIOConfig taskIoConfig,
- SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ int replicas,
+ String baseSequenceName,
+ ObjectMapper sortingMapper,
+ TreeMap<Integer, Map<String, String>> sequenceOffsets,
+ SeekableStreamIndexTaskIOConfig taskIoConfig,
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory
)
{
return null;
@@ -231,8 +231,8 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Override
protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
- String stream,
- Map<String, String> map
+ String stream,
+ Map<String, String> map
)
{
return null;
@@ -271,27 +271,27 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Override
protected SeekableStreamSupervisorReportPayload<String, String>
createReportPayload(
- int numPartitions,
- boolean includeOffsets
+ int numPartitions,
+ boolean includeOffsets
)
{
return new SeekableStreamSupervisorReportPayload<String, String>(
- DATASOURCE,
- STREAM,
- 1,
- 1,
- 1L,
- null,
- null,
- null,
- null,
- null,
- null,
- false,
- true,
- null,
- null,
- null
+ DATASOURCE,
+ STREAM,
+ 1,
+ 1,
+ 1L,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ true,
+ null,
+ null,
+ null
)
{
};
@@ -340,7 +340,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
- // do nothing
+ // do nothing
}
@Override
@@ -362,34 +362,37 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
private SeekableStreamSupervisor supervisor;
private String id;
- public
TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec
ingestionSchema,
- @Nullable Map<String, Object>
context,
- Boolean suspended,
- TaskStorage taskStorage,
- TaskMaster taskMaster,
- IndexerMetadataStorageCoordinator
indexerMetadataStorageCoordinator,
-
SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
- ObjectMapper mapper,
- ServiceEmitter emitter,
- DruidMonitorSchedulerConfig
monitorSchedulerConfig,
- RowIngestionMetersFactory
rowIngestionMetersFactory,
- SupervisorStateManagerConfig
supervisorStateManagerConfig,
- SeekableStreamSupervisor
supervisor,
- String id)
+ public TestSeekableStreamSupervisorSpec(
+ SeekableStreamSupervisorIngestionSpec ingestionSchema,
+ @Nullable Map<String, Object> context,
+ Boolean suspended,
+ TaskStorage taskStorage,
+ TaskMaster taskMaster,
+ IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+ SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
+ ObjectMapper mapper,
+ ServiceEmitter emitter,
+ DruidMonitorSchedulerConfig monitorSchedulerConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ SupervisorStateManagerConfig supervisorStateManagerConfig,
+ SeekableStreamSupervisor supervisor,
+ String id
+ )
{
super(
- ingestionSchema,
- context,
- suspended,
- taskStorage,
- taskMaster,
- indexerMetadataStorageCoordinator,
- indexTaskClientFactory,
- mapper,
- emitter,
- monitorSchedulerConfig,
- rowIngestionMetersFactory,
- supervisorStateManagerConfig);
+ ingestionSchema,
+ context,
+ suspended,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig
+ );
this.supervisor = supervisor;
this.id = id;
@@ -482,26 +485,26 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new SeekableStreamIndexTaskTuningConfig(
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
)
{
@Override
@@ -530,17 +533,30 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null,
AutoScalerConfig.class);
Assert.assertNull(autoScalerConfigNull);
- AutoScalerConfig autoScalerConfigDefault =
mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"),
AutoScalerConfig.class);
+ AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(
+ ImmutableMap.of("autoScalerStrategy", "lagBased"),
+ AutoScalerConfig.class
+ );
Assert.assertTrue(autoScalerConfigDefault instanceof
LagBasedAutoScalerConfig);
- AutoScalerConfig autoScalerConfigValue =
mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"),
AutoScalerConfig.class);
+ AutoScalerConfig autoScalerConfigValue = mapper.convertValue(
+ ImmutableMap.of("lagCollectionIntervalMillis", "1"),
+ AutoScalerConfig.class
+ );
Assert.assertTrue(autoScalerConfigValue instanceof
LagBasedAutoScalerConfig);
LagBasedAutoScalerConfig lagBasedAutoScalerConfig =
(LagBasedAutoScalerConfig) autoScalerConfigValue;
Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
1);
Exception e = null;
try {
- AutoScalerConfig autoScalerError =
mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true",
"taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class);
+ AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of(
+ "enableTaskAutoScaler",
+ "true",
+ "taskCountMax",
+ "1",
+ "taskCountMin",
+ "4"
+ ), AutoScalerConfig.class);
}
catch (RuntimeException ex) {
e = ex;
@@ -550,14 +566,15 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
e = null;
try {
// taskCountMax and taskCountMin couldn't be ignored.
- AutoScalerConfig autoScalerError2 =
mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"),
AutoScalerConfig.class);
+ AutoScalerConfig autoScalerError2 = mapper.convertValue(
+ ImmutableMap.of("enableTaskAutoScaler", "true"),
+ AutoScalerConfig.class
+ );
}
catch (RuntimeException ex) {
e = ex;
}
Assert.assertNotNull(e);
-
-
}
@Test
@@ -584,51 +601,60 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
-
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class)).anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
- TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(ingestionSchema,
- null,
- false,
- taskStorage,
- taskMaster,
- indexerMetadataStorageCoordinator,
- indexTaskClientFactory,
- mapper,
- emitter,
- monitorSchedulerConfig,
- rowIngestionMetersFactory,
- supervisorStateManagerConfig,
- supervisor4,
- "id1");
+ TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.put("enableTaskAutoScaler", false);
-
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class)).anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.remove("enableTaskAutoScaler");
-
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class)).anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.clear();
-
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class)).anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
Assert.assertTrue(autoScalerConfig.isEmpty());
SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
-
}
@Test
@@ -639,26 +665,38 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
-
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
"1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"),
AutoScalerConfig.class)).anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+
.andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
+ "1",
+
"enableTaskAutoScaler",
+ true,
+ "taskCountMax",
+ "4",
+ "taskCountMin",
+ "1"
+ ), AutoScalerConfig.class))
+ .anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
- TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(ingestionSchema,
- null,
- false,
- taskStorage,
- taskMaster,
- indexerMetadataStorageCoordinator,
- indexTaskClientFactory,
- mapper,
- emitter,
- monitorSchedulerConfig,
- rowIngestionMetersFactory,
- supervisorStateManagerConfig,
- supervisor4,
- "id1");
+ TestSeekableStreamSupervisorSpec spec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler;
@@ -679,7 +717,6 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Test
public void testSeekableStreamSupervisorSpecWithScaleOut() throws
InterruptedException
{
-
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -700,27 +737,31 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(3);
- LagStats lagStats = supervisor.computeLagStats();
-
- LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor,
DATASOURCE, mapper.convertValue(getScaleOutProperties(2),
LagBasedAutoScalerConfig.class), spec);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec
+ );
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
- Thread.sleep(1 * 1000);
+ Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
-
}
@Test
public void
testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws
InterruptedException
{
-
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -740,28 +781,31 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(2);
-
- LagStats lagStats = supervisor.computeLagStats();
-
- LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor,
DATASOURCE, mapper.convertValue(getScaleOutProperties(3),
LagBasedAutoScalerConfig.class), spec);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(3),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec
+ );
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
- Thread.sleep(1 * 1000);
+ Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
-
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleIn() throws
InterruptedException
{
-
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -781,7 +825,15 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(3);
- LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor,
DATASOURCE, mapper.convertValue(getScaleInProperties(),
LagBasedAutoScalerConfig.class), spec);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleInProperties(),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec
+ );
// enable autoscaler so that taskcount config will be ignored and init
value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
@@ -791,7 +843,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountBeforeScaleOut);
- Thread.sleep(1 * 1000);
+ Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScaleOut);
@@ -802,20 +854,21 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
@Test
public void testSeekableStreamSupervisorSpecWithScaleDisable() throws
InterruptedException
{
-
SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new
SeekableStreamSupervisorIOConfig(
- "stream",
- new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false),
- 1,
- 1,
- new Period("PT1H"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null, null, null
- ) {};
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null, null, null
+ )
+ {
+ };
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -856,16 +909,16 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
dimensions.add(StringDimensionSchema.create("dim2"));
return new DataSchema(
- DATASOURCE,
- new TimestampSpec("timestamp", "iso", null),
- new DimensionsSpec(dimensions),
- new AggregatorFactory[]{new CountAggregatorFactory("rows")},
- new UniformGranularitySpec(
- Granularities.HOUR,
- Granularities.NONE,
- ImmutableList.of()
- ),
- null
+ DATASOURCE,
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(dimensions),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ ImmutableList.of()
+ ),
+ null
);
}
@@ -873,32 +926,38 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
{
if (scaleOut) {
return new SeekableStreamSupervisorIOConfig(
- "stream",
- new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false),
- 1,
- taskCount,
- new Period("PT1H"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null, mapper.convertValue(getScaleOutProperties(2),
AutoScalerConfig.class), null
- ) {};
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false),
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ mapper.convertValue(getScaleOutProperties(2),
AutoScalerConfig.class), null
+ )
+ {
+ };
} else {
return new SeekableStreamSupervisorIOConfig(
- "stream",
- new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false),
- 1,
- taskCount,
- new Period("PT1H"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null, mapper.convertValue(getScaleInProperties(),
AutoScalerConfig.class), null
- ) {};
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false),
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
null
+ )
+ {
+ };
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]