This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 53b1639ab2fe53f6e8b5aac707d325f86f54b603 Author: Matyas Orhidi <[email protected]> AuthorDate: Thu Dec 1 08:58:10 2022 +0100 [FLINK-30260][autoscaler] Add JobAutoScaler component and configs --- .../operator/autoscaler/AutoScalerInfo.java | 189 +++++++++++++++++++ .../kubernetes/operator/autoscaler/Cleanup.java | 30 +++ .../operator/autoscaler/JobAutoScaler.java | 204 +++++++++++++++++++++ .../autoscaler/config/AutoScalerOptions.java | 121 ++++++++++++ 4 files changed, 544 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java new file mode 100644 index 00000000..1236cc65 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.utils.JobVertexSerDeModule; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import lombok.SneakyThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; + +/** Class for encapsulating information stored for each resource when using the autoscaler. */ +public class AutoScalerInfo { + + private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class); + + private static final int SCALING_HISTORY_MAX_COUNT = 5; + private static final Duration SCALING_HISTORY_MAX_DURATION = Duration.ofHours(24); + + private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; + + private static final String COLLECTED_METRICS_KEY = "collectedMetrics"; + private static final String SCALING_HISTORY_KEY = "scalingHistory"; + private static final String JOB_START_TS_KEY = "jobStartTs"; + + private static final ObjectMapper YAML_MAPPER = + new ObjectMapper(new YAMLFactory()) + .registerModule(new JavaTimeModule()) + .registerModule(new JobVertexSerDeModule()); + + private final ConfigMap configMap; + private Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory; + + public AutoScalerInfo(ConfigMap configMap) { + this.configMap = configMap; + } + + @VisibleForTesting + public AutoScalerInfo(Map<String, String> data) { + this(new ConfigMap()); + configMap.setData(Preconditions.checkNotNull(data)); + } + + @SneakyThrows + public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() { + var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY); + if (historyYaml == null) { + return new TreeMap<>(); + } + + return YAML_MAPPER.readValue(historyYaml, new TypeReference<>() {}); + } + + @SneakyThrows + public void updateMetricHistory( + Instant jobStartTs, + SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> history) { + configMap.getData().put(COLLECTED_METRICS_KEY, YAML_MAPPER.writeValueAsString(history)); + configMap.getData().put(JOB_START_TS_KEY, jobStartTs.toString()); + } + + public void clearMetricHistory() { + configMap.getData().remove(COLLECTED_METRICS_KEY); + configMap.getData().remove(JOB_START_TS_KEY); + } + + public Optional<Instant> getJobStartTs() { + return Optional.ofNullable(configMap.getData().get(JOB_START_TS_KEY)).map(Instant::parse); + } + + @SneakyThrows + public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory() { + if (scalingHistory != null) { + return scalingHistory; + } + var yaml = configMap.getData().get(SCALING_HISTORY_KEY); + scalingHistory = + yaml == null + ? new HashMap<>() + : YAML_MAPPER.readValue(yaml, new TypeReference<>() {}); + return scalingHistory; + } + + @SneakyThrows + public void addToScalingHistory(Instant now, Map<JobVertexID, ScalingSummary> summaries) { + // Make sure to init history + getScalingHistory(); + + summaries.forEach( + (id, summary) -> + scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary)); + + var entryIt = scalingHistory.entrySet().iterator(); + while (entryIt.hasNext()) { + var entry = entryIt.next(); + // Limit how long past scaling decisions are remembered + entry.setValue(entry.getValue().tailMap(now.minus(SCALING_HISTORY_MAX_DURATION))); + var vertexHistory = entry.getValue(); + while (vertexHistory.size() > SCALING_HISTORY_MAX_COUNT) { + vertexHistory.remove(vertexHistory.lastKey()); + } + if (vertexHistory.isEmpty()) { + entryIt.remove(); + } + } + + configMap + .getData() + .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory)); + } + + public void replaceInKubernetes(KubernetesClient client) { + client.resource(configMap).replace(); + } + + public static AutoScalerInfo forResource( + AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) { + + var objectMeta = new ObjectMeta(); + objectMeta.setName("autoscaler-" + cr.getMetadata().getName()); + objectMeta.setNamespace(cr.getMetadata().getNamespace()); + + ConfigMap infoCm = + getScalingInfoConfigMap(objectMeta, kubeClient) + .orElseGet( + () -> { + LOG.info("Creating scaling info config map"); + + objectMeta.setLabels( + Map.of( + Constants.LABEL_COMPONENT_KEY, + LABEL_COMPONENT_AUTOSCALER, + Constants.LABEL_APP_KEY, + cr.getMetadata().getName())); + var cm = new ConfigMap(); + cm.setMetadata(objectMeta); + cm.addOwnerReference(cr); + cm.setData(new HashMap<>()); + return kubeClient.resource(cm).create(); + }); + + return new AutoScalerInfo(infoCm); + } + + private static Optional<ConfigMap> getScalingInfoConfigMap( + ObjectMeta objectMeta, KubernetesClient kubeClient) { + return Optional.ofNullable( + kubeClient + .configMaps() + .inNamespace(objectMeta.getNamespace()) + .withName(objectMeta.getName()) + .get()); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java new file mode 100644 index 00000000..ae1b98bb --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; + +/** Cleanup interface for autoscaling related metadata. */ +public interface Cleanup { + /** + * Method is called when a custom resource is deleted. + * + * @param cr custom resource + */ + void cleanup(AbstractFlinkResource<?, ?> cr); +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java new file mode 100644 index 00000000..d9fb0041 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; + +/** Application and SessionJob autoscaler. */ +public class JobAutoScaler implements Cleanup { + + private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class); + + private final KubernetesClient kubernetesClient; + private final FlinkConfigManager configManager; + private final ScalingMetricCollector metricsCollector; + private final ScalingMetricEvaluator evaluator; + private final ScalingExecutor scalingExecutor; + private final KubernetesOperatorMetricGroup metricGroup; + + private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> + lastEvaluatedMetrics = new ConcurrentHashMap<>(); + private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new ConcurrentHashMap<>(); + + public JobAutoScaler( + KubernetesClient kubernetesClient, + FlinkConfigManager configManager, + ScalingMetricCollector metricsCollector, + ScalingMetricEvaluator evaluator, + ScalingExecutor scalingExecutor, + KubernetesOperatorMetricGroup metricGroup) { + this.kubernetesClient = kubernetesClient; + + this.configManager = configManager; + this.metricsCollector = metricsCollector; + this.evaluator = evaluator; + this.scalingExecutor = scalingExecutor; + this.metricGroup = metricGroup; + } + + @Override + public void cleanup(AbstractFlinkResource<?, ?> cr) { + LOG.info("Cleaning up autoscaling meta data"); + metricsCollector.cleanup(cr); + scalingExecutor.cleanup(cr); + var resourceId = ResourceID.fromResource(cr); + lastEvaluatedMetrics.remove(resourceId); + registeredMetrics.remove(resourceId); + } + + public boolean scale( + AbstractFlinkResource<?, ?> resource, + FlinkService flinkService, + Configuration conf, + Context<?> context) { + + if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) { + LOG.info("Job autoscaler is disabled"); + return false; + } + + if (!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name())) { + LOG.info("Job autoscaler is waiting for RUNNING job state"); + return false; + } + + try { + var autoScalerInfo = AutoScalerInfo.forResource(resource, kubernetesClient); + + LOG.info("Collecting metrics for scaling"); + var collectedMetrics = + metricsCollector.getMetricsHistory( + resource, autoScalerInfo, flinkService, conf); + + if (collectedMetrics == null || collectedMetrics.getMetricHistory().isEmpty()) { + LOG.info("No metrics were collected. Skipping scaling step"); + return false; + } + + LOG.debug("Evaluating scaling metrics for {}", collectedMetrics); + var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics); + LOG.info("Scaling metrics evaluated: {}", evaluatedMetrics); + lastEvaluatedMetrics.put(ResourceID.fromResource(resource), evaluatedMetrics); + registerResourceScalingMetrics(resource); + + var specAdjusted = + scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics); + autoScalerInfo.replaceInKubernetes(kubernetesClient); + return specAdjusted; + } catch (Exception e) { + LOG.error("Error while scaling resource", e); + return false; + } + } + + private void registerResourceScalingMetrics(AbstractFlinkResource<?, ?> resource) { + var resourceId = ResourceID.fromResource(resource); + var scalerGroup = + metricGroup + .createResourceNamespaceGroup( + configManager.getDefaultConfig(), + resource.getClass(), + resource.getMetadata().getNamespace()) + .createResourceNamespaceGroup( + configManager.getDefaultConfig(), resource.getMetadata().getName()) + .addGroup("AutoScaler"); + + lastEvaluatedMetrics + .get(resourceId) + .forEach( + (jobVertexID, evaluated) -> { + if (!registeredMetrics + .computeIfAbsent(resourceId, r -> new HashSet<>()) + .add(jobVertexID)) { + return; + } + LOG.info("Registering scaling metrics for job vertex {}", jobVertexID); + var jobVertexMg = + scalerGroup.addGroup("jobVertexID", jobVertexID.toHexString()); + + evaluated.forEach( + (sm, esm) -> { + var smGroup = jobVertexMg.addGroup(sm.name()); + + smGroup.gauge( + "Current", + () -> + Optional.ofNullable( + lastEvaluatedMetrics.get( + resourceId)) + .map(m -> m.get(jobVertexID)) + .map( + metrics -> + metrics.get(sm) + .getCurrent()) + .orElse(null)); + + if (sm.isCalculateAverage()) { + smGroup.gauge( + "Average", + () -> + Optional.ofNullable( + lastEvaluatedMetrics + .get( + resourceId)) + .map(m -> m.get(jobVertexID)) + .map( + metrics -> + metrics.get(sm) + .getAverage()) + .orElse(null)); + } + }); + }); + } + + public static JobAutoScaler create( + KubernetesClient kubernetesClient, + FlinkConfigManager configManager, + KubernetesOperatorMetricGroup metricGroup) { + return new JobAutoScaler( + kubernetesClient, + configManager, + new RestApiMetricsCollector(), + new ScalingMetricEvaluator(), + new ScalingExecutor(kubernetesClient), + metricGroup); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java new file mode 100644 index 00000000..d9a0b86b --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.time.Duration; + +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; + +/** Config options related to the autoscaler module. */ +public class AutoScalerOptions { + + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { + return operatorConfig("job.autoscaler." + key); + } + + public static final ConfigOption<Boolean> AUTOSCALER_ENABLED = + autoScalerConfig("enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enable job autoscaler module."); + + public static final ConfigOption<Boolean> SCALING_ENABLED = + autoScalerConfig("scaling.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs."); + + public static final ConfigOption<Duration> METRICS_WINDOW = + autoScalerConfig("metrics.window") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription("Scaling metrics aggregation window size."); + + public static final ConfigOption<Duration> STABILIZATION_INTERVAL = + autoScalerConfig("stabilization.interval") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription( + "Stabilization period in which no new scaling will be executed"); + + public static final ConfigOption<Boolean> SOURCE_SCALING_ENABLED = + autoScalerConfig("scaling.sources.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to enable scaling source vertices. " + + "Source vertices set the baseline ingestion rate for the processing based on the backlog size. " + + "If disabled, only regular job vertices will be scaled and source vertices will be unchanged."); + + public static final ConfigOption<Double> TARGET_UTILIZATION = + autoScalerConfig("target.utilization") + .doubleType() + .defaultValue(0.7) + .withDescription("Target vertex utilization"); + + public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY = + autoScalerConfig("target.utilization.boundary") + .doubleType() + .defaultValue(0.1) + .withDescription( + "Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)"); + + public static final ConfigOption<Duration> SCALE_UP_GRACE_PERIOD = + autoScalerConfig("scale-up.grace-period") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription("Period in which no scale down is allowed after a scale up"); + + public static final ConfigOption<Integer> VERTEX_MIN_PARALLELISM = + autoScalerConfig("vertex.min-parallelism") + .intType() + .defaultValue(1) + .withDescription("The minimum parallelism the autoscaler can use."); + + public static final ConfigOption<Integer> VERTEX_MAX_PARALLELISM = + autoScalerConfig("vertex.max-parallelism") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription( + "The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator."); + + public static final ConfigOption<Double> MAX_SCALE_DOWN_FACTOR = + autoScalerConfig("scale-down.max-factor") + .doubleType() + .defaultValue(0.6) + .withDescription( + "Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism."); + + public static final ConfigOption<Duration> CATCH_UP_DURATION = + autoScalerConfig("catch-up.duration") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling."); + + public static final ConfigOption<Duration> RESTART_TIME = + autoScalerConfig("restart.time") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription( + "Expected restart time to be used until the operator can determine it reliably from history."); +}
