This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 53b1639ab2fe53f6e8b5aac707d325f86f54b603
Author: Matyas Orhidi <[email protected]>
AuthorDate: Thu Dec 1 08:58:10 2022 +0100

    [FLINK-30260][autoscaler] Add JobAutoScaler component and configs
---
 .../operator/autoscaler/AutoScalerInfo.java        | 189 +++++++++++++++++++
 .../kubernetes/operator/autoscaler/Cleanup.java    |  30 +++
 .../operator/autoscaler/JobAutoScaler.java         | 204 +++++++++++++++++++++
 .../autoscaler/config/AutoScalerOptions.java       | 121 ++++++++++++
 4 files changed, 544 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
new file mode 100644
index 00000000..1236cc65
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import 
org.apache.flink.kubernetes.operator.autoscaler.utils.JobVertexSerDeModule;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Class for encapsulating information stored for each resource when using 
the autoscaler. */
+public class AutoScalerInfo {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScaler.class);
+
+    private static final int SCALING_HISTORY_MAX_COUNT = 5;
+    private static final Duration SCALING_HISTORY_MAX_DURATION = 
Duration.ofHours(24);
+
+    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+    private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
+    private static final String SCALING_HISTORY_KEY = "scalingHistory";
+    private static final String JOB_START_TS_KEY = "jobStartTs";
+
+    private static final ObjectMapper YAML_MAPPER =
+            new ObjectMapper(new YAMLFactory())
+                    .registerModule(new JavaTimeModule())
+                    .registerModule(new JobVertexSerDeModule());
+
+    private final ConfigMap configMap;
+    private Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory;
+
+    public AutoScalerInfo(ConfigMap configMap) {
+        this.configMap = configMap;
+    }
+
+    @VisibleForTesting
+    public AutoScalerInfo(Map<String, String> data) {
+        this(new ConfigMap());
+        configMap.setData(Preconditions.checkNotNull(data));
+    }
+
+    @SneakyThrows
+    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
getMetricHistory() {
+        var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
+        if (historyYaml == null) {
+            return new TreeMap<>();
+        }
+
+        return YAML_MAPPER.readValue(historyYaml, new TypeReference<>() {});
+    }
+
+    @SneakyThrows
+    public void updateMetricHistory(
+            Instant jobStartTs,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
history) {
+        configMap.getData().put(COLLECTED_METRICS_KEY, 
YAML_MAPPER.writeValueAsString(history));
+        configMap.getData().put(JOB_START_TS_KEY, jobStartTs.toString());
+    }
+
+    public void clearMetricHistory() {
+        configMap.getData().remove(COLLECTED_METRICS_KEY);
+        configMap.getData().remove(JOB_START_TS_KEY);
+    }
+
+    public Optional<Instant> getJobStartTs() {
+        return 
Optional.ofNullable(configMap.getData().get(JOB_START_TS_KEY)).map(Instant::parse);
+    }
+
+    @SneakyThrows
+    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory() {
+        if (scalingHistory != null) {
+            return scalingHistory;
+        }
+        var yaml = configMap.getData().get(SCALING_HISTORY_KEY);
+        scalingHistory =
+                yaml == null
+                        ? new HashMap<>()
+                        : YAML_MAPPER.readValue(yaml, new TypeReference<>() 
{});
+        return scalingHistory;
+    }
+
+    @SneakyThrows
+    public void addToScalingHistory(Instant now, Map<JobVertexID, 
ScalingSummary> summaries) {
+        // Make sure to init history
+        getScalingHistory();
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new 
TreeMap<>()).put(now, summary));
+
+        var entryIt = scalingHistory.entrySet().iterator();
+        while (entryIt.hasNext()) {
+            var entry = entryIt.next();
+            // Limit how long past scaling decisions are remembered
+            
entry.setValue(entry.getValue().tailMap(now.minus(SCALING_HISTORY_MAX_DURATION)));
+            var vertexHistory = entry.getValue();
+            while (vertexHistory.size() > SCALING_HISTORY_MAX_COUNT) {
+                vertexHistory.remove(vertexHistory.lastKey());
+            }
+            if (vertexHistory.isEmpty()) {
+                entryIt.remove();
+            }
+        }
+
+        configMap
+                .getData()
+                .put(SCALING_HISTORY_KEY, 
YAML_MAPPER.writeValueAsString(scalingHistory));
+    }
+
+    public void replaceInKubernetes(KubernetesClient client) {
+        client.resource(configMap).replace();
+    }
+
+    public static AutoScalerInfo forResource(
+            AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {
+
+        var objectMeta = new ObjectMeta();
+        objectMeta.setName("autoscaler-" + cr.getMetadata().getName());
+        objectMeta.setNamespace(cr.getMetadata().getNamespace());
+
+        ConfigMap infoCm =
+                getScalingInfoConfigMap(objectMeta, kubeClient)
+                        .orElseGet(
+                                () -> {
+                                    LOG.info("Creating scaling info config 
map");
+
+                                    objectMeta.setLabels(
+                                            Map.of(
+                                                    
Constants.LABEL_COMPONENT_KEY,
+                                                    LABEL_COMPONENT_AUTOSCALER,
+                                                    Constants.LABEL_APP_KEY,
+                                                    
cr.getMetadata().getName()));
+                                    var cm = new ConfigMap();
+                                    cm.setMetadata(objectMeta);
+                                    cm.addOwnerReference(cr);
+                                    cm.setData(new HashMap<>());
+                                    return kubeClient.resource(cm).create();
+                                });
+
+        return new AutoScalerInfo(infoCm);
+    }
+
+    private static Optional<ConfigMap> getScalingInfoConfigMap(
+            ObjectMeta objectMeta, KubernetesClient kubeClient) {
+        return Optional.ofNullable(
+                kubeClient
+                        .configMaps()
+                        .inNamespace(objectMeta.getNamespace())
+                        .withName(objectMeta.getName())
+                        .get());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
new file mode 100644
index 00000000..ae1b98bb
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+
+/** Cleanup interface for autoscaling related metadata. */
+public interface Cleanup {
+    /**
+     * Method is called when a custom resource is deleted.
+     *
+     * @param cr custom resource
+     */
+    void cleanup(AbstractFlinkResource<?, ?> cr);
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
new file mode 100644
index 00000000..d9fb0041
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+
+/** Application and SessionJob autoscaler. */
+public class JobAutoScaler implements Cleanup {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScaler.class);
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkConfigManager configManager;
+    private final ScalingMetricCollector metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor scalingExecutor;
+    private final KubernetesOperatorMetricGroup metricGroup;
+
+    private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+    private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new 
ConcurrentHashMap<>();
+
+    public JobAutoScaler(
+            KubernetesClient kubernetesClient,
+            FlinkConfigManager configManager,
+            ScalingMetricCollector metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor scalingExecutor,
+            KubernetesOperatorMetricGroup metricGroup) {
+        this.kubernetesClient = kubernetesClient;
+
+        this.configManager = configManager;
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.metricGroup = metricGroup;
+    }
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {
+        LOG.info("Cleaning up autoscaling meta data");
+        metricsCollector.cleanup(cr);
+        scalingExecutor.cleanup(cr);
+        var resourceId = ResourceID.fromResource(cr);
+        lastEvaluatedMetrics.remove(resourceId);
+        registeredMetrics.remove(resourceId);
+    }
+
+    public boolean scale(
+            AbstractFlinkResource<?, ?> resource,
+            FlinkService flinkService,
+            Configuration conf,
+            Context<?> context) {
+
+        if (resource.getSpec().getJob() == null || 
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+            LOG.info("Job autoscaler is disabled");
+            return false;
+        }
+
+        if 
(!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name()))
 {
+            LOG.info("Job autoscaler is waiting for RUNNING job     state");
+            return false;
+        }
+
+        try {
+            var autoScalerInfo = AutoScalerInfo.forResource(resource, 
kubernetesClient);
+
+            LOG.info("Collecting metrics for scaling");
+            var collectedMetrics =
+                    metricsCollector.getMetricsHistory(
+                            resource, autoScalerInfo, flinkService, conf);
+
+            if (collectedMetrics == null || 
collectedMetrics.getMetricHistory().isEmpty()) {
+                LOG.info("No metrics were collected. Skipping scaling step");
+                return false;
+            }
+
+            LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
+            var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+            LOG.info("Scaling metrics evaluated: {}", evaluatedMetrics);
+            lastEvaluatedMetrics.put(ResourceID.fromResource(resource), 
evaluatedMetrics);
+            registerResourceScalingMetrics(resource);
+
+            var specAdjusted =
+                    scalingExecutor.scaleResource(resource, autoScalerInfo, 
conf, evaluatedMetrics);
+            autoScalerInfo.replaceInKubernetes(kubernetesClient);
+            return specAdjusted;
+        } catch (Exception e) {
+            LOG.error("Error while scaling resource", e);
+            return false;
+        }
+    }
+
+    private void registerResourceScalingMetrics(AbstractFlinkResource<?, ?> 
resource) {
+        var resourceId = ResourceID.fromResource(resource);
+        var scalerGroup =
+                metricGroup
+                        .createResourceNamespaceGroup(
+                                configManager.getDefaultConfig(),
+                                resource.getClass(),
+                                resource.getMetadata().getNamespace())
+                        .createResourceNamespaceGroup(
+                                configManager.getDefaultConfig(), 
resource.getMetadata().getName())
+                        .addGroup("AutoScaler");
+
+        lastEvaluatedMetrics
+                .get(resourceId)
+                .forEach(
+                        (jobVertexID, evaluated) -> {
+                            if (!registeredMetrics
+                                    .computeIfAbsent(resourceId, r -> new 
HashSet<>())
+                                    .add(jobVertexID)) {
+                                return;
+                            }
+                            LOG.info("Registering scaling metrics for job 
vertex {}", jobVertexID);
+                            var jobVertexMg =
+                                    scalerGroup.addGroup("jobVertexID", 
jobVertexID.toHexString());
+
+                            evaluated.forEach(
+                                    (sm, esm) -> {
+                                        var smGroup = 
jobVertexMg.addGroup(sm.name());
+
+                                        smGroup.gauge(
+                                                "Current",
+                                                () ->
+                                                        Optional.ofNullable(
+                                                                        
lastEvaluatedMetrics.get(
+                                                                               
 resourceId))
+                                                                .map(m -> 
m.get(jobVertexID))
+                                                                .map(
+                                                                        
metrics ->
+                                                                               
 metrics.get(sm)
+                                                                               
         .getCurrent())
+                                                                .orElse(null));
+
+                                        if (sm.isCalculateAverage()) {
+                                            smGroup.gauge(
+                                                    "Average",
+                                                    () ->
+                                                            
Optional.ofNullable(
+                                                                            
lastEvaluatedMetrics
+                                                                               
     .get(
+                                                                               
             resourceId))
+                                                                    .map(m -> 
m.get(jobVertexID))
+                                                                    .map(
+                                                                            
metrics ->
+                                                                               
     metrics.get(sm)
+                                                                               
             .getAverage())
+                                                                    
.orElse(null));
+                                        }
+                                    });
+                        });
+    }
+
+    public static JobAutoScaler create(
+            KubernetesClient kubernetesClient,
+            FlinkConfigManager configManager,
+            KubernetesOperatorMetricGroup metricGroup) {
+        return new JobAutoScaler(
+                kubernetesClient,
+                configManager,
+                new RestApiMetricsCollector(),
+                new ScalingMetricEvaluator(),
+                new ScalingExecutor(kubernetesClient),
+                metricGroup);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
new file mode 100644
index 00000000..d9a0b86b
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
+
+/** Config options related to the autoscaler module. */
+public class AutoScalerOptions {
+
+    private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
+        return operatorConfig("job.autoscaler." + key);
+    }
+
+    public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
+            autoScalerConfig("enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enable job autoscaler module.");
+
+    public static final ConfigOption<Boolean> SCALING_ENABLED =
+            autoScalerConfig("scaling.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable vertex scaling execution by the 
autoscaler. If disabled, the autoscaler will only collect metrics and evaluate 
the suggested parallelism for each vertex but will not upgrade the jobs.");
+
+    public static final ConfigOption<Duration> METRICS_WINDOW =
+            autoScalerConfig("metrics.window")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription("Scaling metrics aggregation window 
size.");
+
+    public static final ConfigOption<Duration> STABILIZATION_INTERVAL =
+            autoScalerConfig("stabilization.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Stabilization period in which no new scaling will 
be executed");
+
+    public static final ConfigOption<Boolean> SOURCE_SCALING_ENABLED =
+            autoScalerConfig("scaling.sources.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enable scaling source vertices. "
+                                    + "Source vertices set the baseline 
ingestion rate for the processing based on the backlog size. "
+                                    + "If disabled, only regular job vertices 
will be scaled and source vertices will be unchanged.");
+
+    public static final ConfigOption<Double> TARGET_UTILIZATION =
+            autoScalerConfig("target.utilization")
+                    .doubleType()
+                    .defaultValue(0.7)
+                    .withDescription("Target vertex utilization");
+
+    public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
+            autoScalerConfig("target.utilization.boundary")
+                    .doubleType()
+                    .defaultValue(0.1)
+                    .withDescription(
+                            "Target vertex utilization boundary. Scaling won't 
be performed if utilization is within (target - boundary, target + boundary)");
+
+    public static final ConfigOption<Duration> SCALE_UP_GRACE_PERIOD =
+            autoScalerConfig("scale-up.grace-period")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription("Period in which no scale down is allowed 
after a scale up");
+
+    public static final ConfigOption<Integer> VERTEX_MIN_PARALLELISM =
+            autoScalerConfig("vertex.min-parallelism")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("The minimum parallelism the autoscaler 
can use.");
+
+    public static final ConfigOption<Integer> VERTEX_MAX_PARALLELISM =
+            autoScalerConfig("vertex.max-parallelism")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription(
+                            "The maximum parallelism the autoscaler can use. 
Note that this limit will be ignored if it is higher than the max parallelism 
configured in the Flink config or directly on each operator.");
+
+    public static final ConfigOption<Double> MAX_SCALE_DOWN_FACTOR =
+            autoScalerConfig("scale-down.max-factor")
+                    .doubleType()
+                    .defaultValue(0.6)
+                    .withDescription(
+                            "Max scale down factor. 1 means no limit on scale 
down, 0.6 means job can only be scaled down with 60% of the original 
parallelism.");
+
+    public static final ConfigOption<Duration> CATCH_UP_DURATION =
+            autoScalerConfig("catch-up.duration")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription(
+                            "The target duration for fully processing any 
backlog after a scaling operation. Set to 0 to disable backlog based scaling.");
+
+    public static final ConfigOption<Duration> RESTART_TIME =
+            autoScalerConfig("restart.time")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Expected restart time to be used until the 
operator can determine it reliably from history.");
+}

Reply via email to