This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2efb74f  fix supervisor auto scaler config serde bug (#12317)
2efb74f is described below

commit 2efb74ff1e4a9921e236a21475978822be57fe11
Author: Parag Jain <[email protected]>
AuthorDate: Thu Mar 10 05:47:12 2022 +0530

    fix supervisor auto scaler config serde bug (#12317)
---
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |   2 +-
 .../supervisor/KafkaSupervisorIOConfigTest.java    |  57 +++
 .../supervisor/KinesisSupervisorIOConfig.java      |   2 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   4 +-
 .../supervisor/SeekableStreamSupervisor.java       |   2 +-
 .../SeekableStreamSupervisorIOConfig.java          |   2 +-
 .../supervisor/SeekableStreamSupervisorSpec.java   |   2 +-
 .../SeekableStreamSupervisorSpecTest.java          | 465 ++++++++++++---------
 8 files changed, 326 insertions(+), 210 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 87b689e..0625ead 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -121,7 +121,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
            ", taskCount=" + getTaskCount() +
            ", taskDuration=" + getTaskDuration() +
            ", consumerProperties=" + consumerProperties +
-           ", autoScalerConfig=" + getAutoscalerConfig() +
+           ", autoScalerConfig=" + getAutoScalerConfig() +
            ", pollTimeout=" + pollTimeout +
            ", startDelay=" + getStartDelay() +
            ", period=" + getPeriod() +
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5086689..e503d4f 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -19,21 +19,27 @@
 
 package org.apache.druid.indexing.kafka.supervisor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.Duration;
+import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 public class KafkaSupervisorIOConfigTest
@@ -260,4 +266,55 @@ public class KafkaSupervisorIOConfigTest
             ), KafkaSupervisorIOConfig.class
         );
   }
+
+  @Test
+  public void testAutoScalingConfigSerde() throws JsonProcessingException
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 0);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+    autoScalerConfig.put("scaleInThreshold", 1000000);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("taskCountMax", 2);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+    final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
+    consumerProperties.put("bootstrap.servers", "localhost:8082");
+
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
+        "test",
+        null,
+        1,
+        1,
+        new Period("PT1H"),
+        consumerProperties,
+        mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        true,
+        new Period("PT30M"),
+        null,
+        null,
+        null
+    );
+    String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = 
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
+    Assert.assertNotNull(kafkaSupervisorIOConfig1.getAutoScalerConfig());
+    
Assert.assertTrue(kafkaSupervisorIOConfig1.getAutoScalerConfig().getEnableTaskAutoScaler());
+    Assert.assertEquals(1, 
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMin());
+    Assert.assertEquals(2, 
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
+    Assert.assertEquals(
+        1200000,
+        
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
+    );
+  }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 41ae876..220c22f 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -151,7 +151,7 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
            ", endpoint='" + endpoint + '\'' +
            ", replicas=" + getReplicas() +
            ", taskCount=" + getTaskCount() +
-           ", autoScalerConfig=" + getAutoscalerConfig() +
+           ", autoScalerConfig=" + getAutoScalerConfig() +
            ", taskDuration=" + getTaskDuration() +
            ", startDelay=" + getStartDelay() +
            ", period=" + getPeriod() +
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 02b5bd2..3bfe6dd 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -531,7 +531,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
             false
     );
 
-    AutoScalerConfig autoscalerConfigNull = 
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig();
+    AutoScalerConfig autoscalerConfigNull = 
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
     Assert.assertNull(autoscalerConfigNull);
 
     // create KinesisSupervisorIOConfig with autoScalerConfig Empty
@@ -558,7 +558,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
             false
     );
 
-    AutoScalerConfig autoscalerConfig = 
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig();
+    AutoScalerConfig autoscalerConfig = 
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
     Assert.assertNotNull(autoscalerConfig);
     Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig);
     Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 86579a0..e4445f2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -765,7 +765,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
-    this.autoScalerConfig = ioConfig.getAutoscalerConfig();
+    this.autoScalerConfig = ioConfig.getAutoScalerConfig();
     this.tuningConfig = spec.getTuningConfig();
     this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
     this.supervisorId = supervisorId;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 3ed55ec..6ddb018 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -125,7 +125,7 @@ public abstract class SeekableStreamSupervisorIOConfig
 
   @Nullable
   @JsonProperty
-  public AutoScalerConfig getAutoscalerConfig()
+  public AutoScalerConfig getAutoScalerConfig()
   {
     return autoScalerConfig;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index ff1d317..ac985ac 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -162,7 +162,7 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
   @Override
   public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
   {
-    AutoScalerConfig autoScalerConfig = 
ingestionSchema.getIOConfig().getAutoscalerConfig();
+    AutoScalerConfig autoScalerConfig = 
ingestionSchema.getIOConfig().getAutoScalerConfig();
     if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() 
&& supervisor instanceof SeekableStreamSupervisor) {
       return autoScalerConfig.createAutoScaler(supervisor, this);
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index e2142a1..7cdbb74 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -133,15 +133,15 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     private BaseTestSeekableStreamSupervisor()
     {
       super(
-              "testSupervisorId",
-              taskStorage,
-              taskMaster,
-              indexerMetadataStorageCoordinator,
-              taskClientFactory,
-              OBJECT_MAPPER,
-              spec,
-              rowIngestionMetersFactory,
-              false
+          "testSupervisorId",
+          taskStorage,
+          taskMaster,
+          indexerMetadataStorageCoordinator,
+          taskClientFactory,
+          OBJECT_MAPPER,
+          spec,
+          rowIngestionMetersFactory,
+          false
       );
     }
 
@@ -154,7 +154,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     @Override
     protected void updatePartitionLagFromStream()
     {
-        // do nothing
+      // do nothing
     }
 
     @Nullable
@@ -173,25 +173,25 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     @Override
     protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
-            int groupId,
-            Map<String, String> startPartitions,
-            Map<String, String> endPartitions,
-            String baseSequenceName,
-            DateTime minimumMessageTime,
-            DateTime maximumMessageTime,
-            Set<String> exclusiveStartSequenceNumberPartitions,
-            SeekableStreamSupervisorIOConfig ioConfig
+        int groupId,
+        Map<String, String> startPartitions,
+        Map<String, String> endPartitions,
+        String baseSequenceName,
+        DateTime minimumMessageTime,
+        DateTime maximumMessageTime,
+        Set<String> exclusiveStartSequenceNumberPartitions,
+        SeekableStreamSupervisorIOConfig ioConfig
     )
     {
       return new SeekableStreamIndexTaskIOConfig<String, String>(
-              groupId,
-              baseSequenceName,
-              new SeekableStreamStartSequenceNumbers<>(STREAM, 
startPartitions, exclusiveStartSequenceNumberPartitions),
-              new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
-              true,
-              minimumMessageTime,
-              maximumMessageTime,
-              ioConfig.getInputFormat()
+          groupId,
+          baseSequenceName,
+          new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, 
exclusiveStartSequenceNumberPartitions),
+          new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+          true,
+          minimumMessageTime,
+          maximumMessageTime,
+          ioConfig.getInputFormat()
       )
       {
       };
@@ -199,13 +199,13 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     @Override
     protected List<SeekableStreamIndexTask<String, String, ByteEntity>> 
createIndexTasks(
-            int replicas,
-            String baseSequenceName,
-            ObjectMapper sortingMapper,
-            TreeMap<Integer, Map<String, String>> sequenceOffsets,
-            SeekableStreamIndexTaskIOConfig taskIoConfig,
-            SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-            RowIngestionMetersFactory rowIngestionMetersFactory
+        int replicas,
+        String baseSequenceName,
+        ObjectMapper sortingMapper,
+        TreeMap<Integer, Map<String, String>> sequenceOffsets,
+        SeekableStreamIndexTaskIOConfig taskIoConfig,
+        SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+        RowIngestionMetersFactory rowIngestionMetersFactory
     )
     {
       return null;
@@ -231,8 +231,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     @Override
     protected SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetaDataForReset(
-            String stream,
-            Map<String, String> map
+        String stream,
+        Map<String, String> map
     )
     {
       return null;
@@ -271,27 +271,27 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     @Override
     protected SeekableStreamSupervisorReportPayload<String, String> 
createReportPayload(
-            int numPartitions,
-            boolean includeOffsets
+        int numPartitions,
+        boolean includeOffsets
     )
     {
       return new SeekableStreamSupervisorReportPayload<String, String>(
-              DATASOURCE,
-              STREAM,
-              1,
-              1,
-              1L,
-              null,
-              null,
-              null,
-              null,
-              null,
-              null,
-              false,
-              true,
-              null,
-              null,
-              null
+          DATASOURCE,
+          STREAM,
+          1,
+          1,
+          1L,
+          null,
+          null,
+          null,
+          null,
+          null,
+          null,
+          false,
+          true,
+          null,
+          null,
+          null
       )
       {
       };
@@ -340,7 +340,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     @Override
     protected void scheduleReporting(ScheduledExecutorService reportingExec)
     {
-        // do nothing
+      // do nothing
     }
 
     @Override
@@ -362,34 +362,37 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     private SeekableStreamSupervisor supervisor;
     private String id;
 
-    public 
TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec 
ingestionSchema,
-                                             @Nullable Map<String, Object> 
context,
-                                             Boolean suspended,
-                                             TaskStorage taskStorage,
-                                             TaskMaster taskMaster,
-                                             IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator,
-                                             
SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
-                                             ObjectMapper mapper,
-                                             ServiceEmitter emitter,
-                                             DruidMonitorSchedulerConfig 
monitorSchedulerConfig,
-                                             RowIngestionMetersFactory 
rowIngestionMetersFactory,
-                                             SupervisorStateManagerConfig 
supervisorStateManagerConfig,
-                                             SeekableStreamSupervisor 
supervisor,
-                                             String id)
+    public TestSeekableStreamSupervisorSpec(
+        SeekableStreamSupervisorIngestionSpec ingestionSchema,
+        @Nullable Map<String, Object> context,
+        Boolean suspended,
+        TaskStorage taskStorage,
+        TaskMaster taskMaster,
+        IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+        SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
+        ObjectMapper mapper,
+        ServiceEmitter emitter,
+        DruidMonitorSchedulerConfig monitorSchedulerConfig,
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        SupervisorStateManagerConfig supervisorStateManagerConfig,
+        SeekableStreamSupervisor supervisor,
+        String id
+    )
     {
       super(
-              ingestionSchema,
-              context,
-              suspended,
-              taskStorage,
-              taskMaster,
-              indexerMetadataStorageCoordinator,
-              indexTaskClientFactory,
-              mapper,
-              emitter,
-              monitorSchedulerConfig,
-              rowIngestionMetersFactory,
-              supervisorStateManagerConfig);
+          ingestionSchema,
+          context,
+          suspended,
+          taskStorage,
+          taskMaster,
+          indexerMetadataStorageCoordinator,
+          indexTaskClientFactory,
+          mapper,
+          emitter,
+          monitorSchedulerConfig,
+          rowIngestionMetersFactory,
+          supervisorStateManagerConfig
+      );
 
       this.supervisor = supervisor;
       this.id = id;
@@ -482,26 +485,26 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
       public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
       {
         return new SeekableStreamIndexTaskTuningConfig(
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
         )
         {
           @Override
@@ -530,17 +533,30 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, 
AutoScalerConfig.class);
     Assert.assertNull(autoScalerConfigNull);
 
-    AutoScalerConfig autoScalerConfigDefault = 
mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), 
AutoScalerConfig.class);
+    AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(
+        ImmutableMap.of("autoScalerStrategy", "lagBased"),
+        AutoScalerConfig.class
+    );
     Assert.assertTrue(autoScalerConfigDefault instanceof 
LagBasedAutoScalerConfig);
 
-    AutoScalerConfig autoScalerConfigValue = 
mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), 
AutoScalerConfig.class);
+    AutoScalerConfig autoScalerConfigValue = mapper.convertValue(
+        ImmutableMap.of("lagCollectionIntervalMillis", "1"),
+        AutoScalerConfig.class
+    );
     Assert.assertTrue(autoScalerConfigValue instanceof 
LagBasedAutoScalerConfig);
     LagBasedAutoScalerConfig lagBasedAutoScalerConfig = 
(LagBasedAutoScalerConfig) autoScalerConfigValue;
     
Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
1);
 
     Exception e = null;
     try {
-      AutoScalerConfig autoScalerError = 
mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", 
"taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class);
+      AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of(
+          "enableTaskAutoScaler",
+          "true",
+          "taskCountMax",
+          "1",
+          "taskCountMin",
+          "4"
+      ), AutoScalerConfig.class);
     }
     catch (RuntimeException ex) {
       e = ex;
@@ -550,14 +566,15 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     e = null;
     try {
       // taskCountMax and taskCountMin couldn't be ignored.
-      AutoScalerConfig autoScalerError2 = 
mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), 
AutoScalerConfig.class);
+      AutoScalerConfig autoScalerError2 = mapper.convertValue(
+          ImmutableMap.of("enableTaskAutoScaler", "true"),
+          AutoScalerConfig.class
+      );
     }
     catch (RuntimeException ex) {
       e = ex;
     }
     Assert.assertNotNull(e);
-
-
   }
 
   @Test
@@ -584,51 +601,60 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
     EasyMock.replay(ingestionSchema);
 
-    
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
 AutoScalerConfig.class)).anyTimes();
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
 
     
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
     EasyMock.replay(supervisor4);
 
-    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(ingestionSchema,
-            null,
-            false,
-            taskStorage,
-            taskMaster,
-            indexerMetadataStorageCoordinator,
-            indexTaskClientFactory,
-            mapper,
-            emitter,
-            monitorSchedulerConfig,
-            rowIngestionMetersFactory,
-            supervisorStateManagerConfig,
-            supervisor4,
-            "id1");
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
     SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
     Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
 
     EasyMock.reset(seekableStreamSupervisorIOConfig);
     autoScalerConfig.put("enableTaskAutoScaler", false);
-    
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
 AutoScalerConfig.class)).anyTimes();
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
     SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4);
     Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler);
 
     EasyMock.reset(seekableStreamSupervisorIOConfig);
     autoScalerConfig.remove("enableTaskAutoScaler");
-    
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
 AutoScalerConfig.class)).anyTimes();
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
     SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4);
     Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler);
 
     EasyMock.reset(seekableStreamSupervisorIOConfig);
     autoScalerConfig.clear();
-    
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig,
 AutoScalerConfig.class)).anyTimes();
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            .andReturn(mapper.convertValue(autoScalerConfig, 
AutoScalerConfig.class))
+            .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
     Assert.assertTrue(autoScalerConfig.isEmpty());
     SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4);
     Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
-
   }
 
   @Test
@@ -639,26 +665,38 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
     EasyMock.replay(ingestionSchema);
 
-    
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
 "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), 
AutoScalerConfig.class)).anyTimes();
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+            
.andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
+                                                           "1",
+                                                           
"enableTaskAutoScaler",
+                                                           true,
+                                                           "taskCountMax",
+                                                           "4",
+                                                           "taskCountMin",
+                                                           "1"
+            ), AutoScalerConfig.class))
+            .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
 
     
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
     EasyMock.replay(supervisor4);
 
-    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(ingestionSchema,
-            null,
-            false,
-            taskStorage,
-            taskMaster,
-            indexerMetadataStorageCoordinator,
-            indexTaskClientFactory,
-            mapper,
-            emitter,
-            monitorSchedulerConfig,
-            rowIngestionMetersFactory,
-            supervisorStateManagerConfig,
-            supervisor4,
-            "id1");
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
     SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
     Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
     LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler;
@@ -679,7 +717,6 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
   @Test
   public void testSeekableStreamSupervisorSpecWithScaleOut() throws 
InterruptedException
   {
-
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -700,27 +737,31 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
 
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(3);
 
-    LagStats lagStats = supervisor.computeLagStats();
-
-    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, 
DATASOURCE, mapper.convertValue(getScaleOutProperties(2), 
LagBasedAutoScalerConfig.class), spec);
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec
+    );
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
     int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountBeforeScaleOut);
-    Thread.sleep(1 * 1000);
+    Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScaleOut);
 
     autoScaler.reset();
     autoScaler.stop();
-
   }
 
   @Test
   public void 
testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws 
InterruptedException
   {
-
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -740,28 +781,31 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     EasyMock.replay(taskMaster);
 
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(2);
-
-    LagStats lagStats = supervisor.computeLagStats();
-
-    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, 
DATASOURCE, mapper.convertValue(getScaleOutProperties(3), 
LagBasedAutoScalerConfig.class), spec);
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(3),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec
+    );
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
     int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountBeforeScaleOut);
-    Thread.sleep(1 * 1000);
+    Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScaleOut);
 
     autoScaler.reset();
     autoScaler.stop();
-
   }
 
   @Test
   public void testSeekableStreamSupervisorSpecWithScaleIn() throws 
InterruptedException
   {
-
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -781,7 +825,15 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     EasyMock.replay(taskMaster);
 
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(3);
-    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, 
DATASOURCE, mapper.convertValue(getScaleInProperties(), 
LagBasedAutoScalerConfig.class), spec);
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleInProperties(),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec
+    );
 
     // enable autoscaler so that taskcount config will be ignored and init 
value of taskCount will use taskCountMin.
     Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
@@ -791,7 +843,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     supervisor.runInternal();
     int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountBeforeScaleOut);
-    Thread.sleep(1 * 1000);
+    Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountAfterScaleOut);
 
@@ -802,20 +854,21 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
   @Test
   public void testSeekableStreamSupervisorSpecWithScaleDisable() throws 
InterruptedException
   {
-
     SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new 
SeekableStreamSupervisorIOConfig(
-            "stream",
-            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false),
-            1,
-            1,
-            new Period("PT1H"),
-            new Period("P1D"),
-            new Period("PT30S"),
-            false,
-            new Period("PT30M"),
-            null,
-            null, null, null
-    ) {};
+        "stream",
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false),
+        1,
+        1,
+        new Period("PT1H"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null, null, null
+    )
+    {
+    };
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@@ -856,16 +909,16 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     dimensions.add(StringDimensionSchema.create("dim2"));
 
     return new DataSchema(
-            DATASOURCE,
-            new TimestampSpec("timestamp", "iso", null),
-            new DimensionsSpec(dimensions),
-            new AggregatorFactory[]{new CountAggregatorFactory("rows")},
-            new UniformGranularitySpec(
-                    Granularities.HOUR,
-                    Granularities.NONE,
-                    ImmutableList.of()
-            ),
-            null
+        DATASOURCE,
+        new TimestampSpec("timestamp", "iso", null),
+        new DimensionsSpec(dimensions),
+        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+        new UniformGranularitySpec(
+            Granularities.HOUR,
+            Granularities.NONE,
+            ImmutableList.of()
+        ),
+        null
     );
   }
 
@@ -873,32 +926,38 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
   {
     if (scaleOut) {
       return new SeekableStreamSupervisorIOConfig(
-              "stream",
-              new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false),
-              1,
-              taskCount,
-              new Period("PT1H"),
-              new Period("P1D"),
-              new Period("PT30S"),
-              false,
-              new Period("PT30M"),
-              null,
-              null, mapper.convertValue(getScaleOutProperties(2), 
AutoScalerConfig.class), null
-      ) {};
+          "stream",
+          new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false),
+          1,
+          taskCount,
+          new Period("PT1H"),
+          new Period("P1D"),
+          new Period("PT30S"),
+          false,
+          new Period("PT30M"),
+          null,
+          null,
+          mapper.convertValue(getScaleOutProperties(2), 
AutoScalerConfig.class), null
+      )
+      {
+      };
     } else {
       return new SeekableStreamSupervisorIOConfig(
-              "stream",
-              new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false),
-              1,
-              taskCount,
-              new Period("PT1H"),
-              new Period("P1D"),
-              new Period("PT30S"),
-              false,
-              new Period("PT30M"),
-              null,
-              null, mapper.convertValue(getScaleInProperties(), 
AutoScalerConfig.class), null
-        ) {};
+          "stream",
+          new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false),
+          1,
+          taskCount,
+          new Period("PT1H"),
+          new Period("P1D"),
+          new Period("PT30S"),
+          false,
+          new Period("PT30M"),
+          null,
+          null,
+          mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), 
null
+      )
+      {
+      };
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to