pjain1 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r581913481



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +157,37 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)

Review comment:
       The way `autoScaler` instance is created here does not support custom 
autoScaler implementation in extensions as switch statement is used to create 
the instance. If a new strategy is implemented for autoScaler in an extension, 
this class needs to be changed to support it which is not ideal as its a change 
in core Druid. I have raised a PR on your branch on how we can fix this - 
https://github.com/zhangyue19921010/druid/pull/1
   
   The changes are - 
   1. Add `SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec);` method in `AutoScalerConfig` that will be called from 
`SeekableStreamSupervisorSpec` to create autoScaler.
   2. I don't think `getAutoScalerStrategy` method is needed in 
`AutoScalerConfig` as implementation of `AutoScalerConfig` can return instance 
of `AutoScaler` directly on call to `createAutoScaler`.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/DummyAutoScaler.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoscaler;
+
+public class DummyAutoScaler implements SupervisorTaskAutoscaler

Review comment:
       
https://github.com/zhangyue19921010/druid/pull/1/files#diff-0621db18fa2257d0cd499178c842d96ae73df2264038ccc7de23d7a7ed5235a5

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", 
defaultImpl = DefaultAutoScalerConfig.class)
+@JsonSubTypes(value = {
+        @Type(name = "default", value = DefaultAutoScalerConfig.class)

Review comment:
       I think we should change the name of this to `LagBasedAutoScalerConfig` 
as it is using lag to make decisions about auto scaling. Also we should not use 
`default` as type names as it is confusing. See 
https://github.com/zhangyue19921010/druid/pull/1 

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/DefaultAutoScalerConfig.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+public class DefaultAutoScalerConfig implements AutoScalerConfig

Review comment:
       See 
https://github.com/zhangyue19921010/druid/pull/1/files#diff-943c4b0695e902cb2a3465b69f593a584dac7308037288db0f9fd97054efb12b
 for suggestions on log lines, method and variable names.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/DummyAutoScaler.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoscaler;
+
+public class DummyAutoScaler implements SupervisorTaskAutoscaler

Review comment:
       I think we can call this `NoopAutoScaler`.

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +65,12 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag
+   * Only support Kafka ingestion so far.
+   */

Review comment:
       ```suggestion
     /**
      * Computes maxLag, totalLag and avgLag
      * Only supports Kafka ingestion so far.
      */
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/DummyAutoScaler.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoscaler;
+
+public class DummyAutoScaler implements SupervisorTaskAutoscaler
+{
+  public DummyAutoScaler(Supervisor supervisor, String dataSource)

Review comment:
       This can be no-arg constructor.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", 
defaultImpl = DefaultAutoScalerConfig.class)
+@JsonSubTypes(value = {
+        @Type(name = "default", value = DefaultAutoScalerConfig.class)
+})
+public interface AutoScalerConfig
+{
+  boolean getEnableTaskAutoscaler();
+  long getMinTriggerDynamicFrequencyMillis();
+  String getAutoScalerStrategy();

Review comment:
       This is not required as mentioned above and add another method to create 
auto scaler as mentioned in above comment.

##########
File path: 
integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json
##########
@@ -0,0 +1,73 @@
+{
+  "type": "%%STREAM_TYPE%%",
+  "dataSchema": {
+    "dataSource": "%%DATASOURCE%%",
+    "parser": %%PARSER%%,
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", 
"robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    },
+    "metricsSpec": [
+      {
+        "type": "count",
+        "name": "count"
+      },
+      {
+        "type": "doubleSum",
+        "name": "added",
+        "fieldName": "added"
+      },
+      {
+        "type": "doubleSum",
+        "name": "deleted",
+        "fieldName": "deleted"
+      },
+      {
+        "type": "doubleSum",
+        "name": "delta",
+        "fieldName": "delta"
+      }
+    ],
+    "granularitySpec": {
+      "type": "uniform",
+      "segmentGranularity": "MINUTE",
+      "queryGranularity": "NONE"
+    }
+  },
+  "tuningConfig": {
+    "type": "%%STREAM_TYPE%%",
+    "intermediatePersistPeriod": "PT30S",
+    "maxRowsPerSegment": 5000000,
+    "maxRowsInMemory": 500000
+  },
+  "ioConfig": {
+    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+    "autoscalerConfig": {

Review comment:
       Variable names will need to be changed here, if these suggestions are 
implemented 
-https://github.com/zhangyue19921010/druid/pull/1/files#diff-943c4b0695e902cb2a3465b69f593a584dac7308037288db0f9fd97054efb12b

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/DefaultAutoScaler.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoscaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DefaultAutoScaler implements SupervisorTaskAutoscaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(DefaultAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final DefaultAutoScalerConfig defaultAutoScalerConfig;
+
+  private static ReentrantLock lock = new ReentrantLock(true);
+
+
+  public DefaultAutoScaler(Supervisor supervisor, String dataSource, 
AutoScalerConfig autoScalerConfig, SupervisorSpec spec)
+  {
+    this.defaultAutoScalerConfig = (DefaultAutoScalerConfig) autoScalerConfig;
+    String supervisorId = StringUtils.format("KafkaSupervisor-%s", dataSource);
+    this.dataSource = dataSource;
+    int slots = (int) 
(defaultAutoScalerConfig.getMetricsCollectionRangeMillis() / 
defaultAutoScalerConfig.getMetricsCollectionIntervalMillis()) + 1;
+    log.debug(" The interval of metrics collection is [%s], [%s] timeRange 
will collect [%s] data points for dataSource [%s].", 
defaultAutoScalerConfig.getMetricsCollectionIntervalMillis(), 
defaultAutoScalerConfig.getMetricsCollectionRangeMillis(), slots, dataSource);
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");

Review comment:
       not sure why we are encoding `supervisorId`, its already a constant 
string from line 56. Not sure why this is needed.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/DefaultAutoScaler.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoscaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DefaultAutoScaler implements SupervisorTaskAutoscaler

Review comment:
       See 
https://github.com/zhangyue19921010/druid/pull/1/files#diff-f1b33808bb841d1e71e1f5ec3fbaeb3f94899066277b75e192942b66371667ce
 for suggestions on log lines, method and variable names.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to