This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 8d285fe1397c24d740ac7baa85e9bc22eb03fff6 Author: Maximilian Michels <[email protected]> AuthorDate: Thu Dec 1 08:47:23 2022 +0100 [FLINK-30260][autoscaler] Define scaling metrics --- .../autoscaler/metrics/CollectedMetrics.java | 34 ++++ .../autoscaler/metrics/EvaluatedScalingMetric.java | 36 +++++ .../operator/autoscaler/metrics/ScalingMetric.java | 74 +++++++++ .../autoscaler/metrics/ScalingMetrics.java | 159 ++++++++++++++++++ .../autoscaler/metrics/ScalingMetricsTest.java | 178 +++++++++++++++++++++ 5 files changed, 481 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java new file mode 100644 index 00000000..60f1c410 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.metrics; + +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import lombok.Value; + +import java.time.Instant; +import java.util.Map; +import java.util.SortedMap; + +/** Topology and collected metric history. */ +@Value +public class CollectedMetrics { + JobTopology jobTopology; + SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricHistory; +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java new file mode 100644 index 00000000..a2304128 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.metrics; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Evaluated scaling metric. */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class EvaluatedScalingMetric { + private double current; + + private double average; + + public static EvaluatedScalingMetric of(double value) { + return new EvaluatedScalingMetric(value, Double.NaN); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java new file mode 100644 index 00000000..d3103971 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.metrics; + +/** + * Supported scaling metrics. These represent high level metrics computed from Flink job metrics + * that are used for scaling decisions in the autoscaler module. + */ +public enum ScalingMetric { + + /** Max subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */ + LOAD_MAX(true), + + /** Average subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */ + LOAD_AVG(true), + + /** Processing rate at full capacity (records/sec). */ + TRUE_PROCESSING_RATE(true), + + /** Output rate at full capacity (records/sec). */ + TRUE_OUTPUT_RATE(true), + + /** + * Incoming data rate to the source, e.g. rate of records written to the Kafka topic + * (records/sec). + */ + SOURCE_DATA_RATE(true), + + /** Target processing rate of operators as derived from source inputs (records/sec). */ + TARGET_DATA_RATE(true), + + /** Target processing rate of operators as derived from backlog (records/sec). */ + CATCH_UP_DATA_RATE(false), + + /** Number of outputs produced on average for every input record. */ + OUTPUT_RATIO(true), + + /** Total number of pending records. */ + LAG(false), + /** Job vertex parallelism. */ + PARALLELISM(false), + /** Job vertex max parallelism. */ + MAX_PARALLELISM(false), + /** Upper boundary of the target data rate range. */ + SCALE_UP_RATE_THRESHOLD(false), + + /** Lower boundary of the target data rate range. */ + SCALE_DOWN_RATE_THRESHOLD(false); + + private final boolean calculateAverage; + + ScalingMetric(boolean calculateAverage) { + this.calculateAverage = calculateAverage; + } + + public boolean isCalculateAverage() { + return calculateAverage; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java new file mode 100644 index 00000000..27b3d97d --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED; + +/** Utilities for computing scaling metrics based on Flink metrics. */ +public class ScalingMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(ScalingMetrics.class); + + public static void computeLoadMetrics( + Map<FlinkMetric, AggregatedMetric> flinkMetrics, + Map<ScalingMetric, Double> scalingMetrics) { + + var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC); + if (busyTime == null) { + return; + } + + if (!busyTime.getAvg().isNaN()) { + scalingMetrics.put(ScalingMetric.LOAD_AVG, busyTime.getAvg() / 1000); + } + + if (!busyTime.getMax().isNaN()) { + scalingMetrics.put(ScalingMetric.LOAD_MAX, busyTime.getMax() / 1000); + } + } + + public static void computeDataRateMetrics( + JobVertexID jobVertexID, + Map<FlinkMetric, AggregatedMetric> flinkMetrics, + Map<ScalingMetric, Double> scalingMetrics, + JobTopology topology, + Optional<Double> lagGrowthOpt, + Configuration conf) { + + var source = topology.getInputs().get(jobVertexID).isEmpty(); + var sink = topology.getOutputs().get(jobVertexID).isEmpty(); + + var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC); + + if (busyTime == null || busyTime.getAvg().isNaN()) { + LOG.error("Cannot compute true processing/output rate without busyTimeMsPerSecond"); + return; + } + + var numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC); + if (numRecordsInPerSecond == null) { + numRecordsInPerSecond = + flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); + } + + var outputPerSecond = flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC); + + double busyTimeMultiplier = 1000 / busyTime.getAvg(); + + if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) { + double sourceInputRate = + numRecordsInPerSecond != null ? numRecordsInPerSecond.getSum() : Double.NaN; + + double targetDataRate; + if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) { + targetDataRate = sourceInputRate; + } else { + // If source in metric is not available (maybe legacy source) we use source + // output that should always be available + targetDataRate = + flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum(); + } + scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN); + scalingMetrics.put( + ScalingMetric.OUTPUT_RATIO, outputPerSecond.getSum() / targetDataRate); + var trueOutputRate = busyTimeMultiplier * outputPerSecond.getSum(); + scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate); + scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate); + LOG.info( + "Scaling disabled for source {} using output rate {} as target", + jobVertexID, + trueOutputRate); + } else { + if (source) { + if (!lagGrowthOpt.isPresent() || numRecordsInPerSecond.getSum().isNaN()) { + LOG.error( + "Cannot compute source target data rate without numRecordsInPerSecond and pendingRecords (lag) metric for {}.", + jobVertexID); + scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, Double.NaN); + } else { + double sourceDataRate = + Math.max(0, numRecordsInPerSecond.getSum() + lagGrowthOpt.get()); + LOG.info( + "Using computed source data rate {} for {}", + sourceDataRate, + jobVertexID); + scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE, sourceDataRate); + } + } + + if (!numRecordsInPerSecond.getSum().isNaN()) { + double trueProcessingRate = busyTimeMultiplier * numRecordsInPerSecond.getSum(); + if (trueProcessingRate <= 0 || !Double.isFinite(trueProcessingRate)) { + trueProcessingRate = Double.NaN; + } + scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, trueProcessingRate); + } else { + LOG.error("Cannot compute true processing rate without numRecordsInPerSecond"); + } + + if (!sink) { + if (!outputPerSecond.getSum().isNaN()) { + scalingMetrics.put( + ScalingMetric.OUTPUT_RATIO, + outputPerSecond.getSum() / numRecordsInPerSecond.getSum()); + scalingMetrics.put( + ScalingMetric.TRUE_OUTPUT_RATE, + busyTimeMultiplier * outputPerSecond.getSum()); + } else { + LOG.error( + "Cannot compute processing and input rate without numRecordsOutPerSecond"); + } + } + } + } + + public static void computeLagMetrics( + Map<FlinkMetric, AggregatedMetric> flinkMetrics, + Map<ScalingMetric, Double> scalingMetrics) { + var pendingRecords = flinkMetrics.get(FlinkMetric.PENDING_RECORDS); + if (pendingRecords != null) { + scalingMetrics.put(ScalingMetric.LAG, pendingRecords.getSum()); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java new file mode 100644 index 00000000..5aa84b1c --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for scaling metrics computation logic. */ +public class ScalingMetricsTest { + + @Test + public void testProcessingAndOutputMetrics() { + var source = new JobVertexID(); + var op = new JobVertexID(); + var sink = new JobVertexID(); + + var topology = + new JobTopology( + new VertexInfo(source, Collections.emptySet(), 1, 1), + new VertexInfo(op, Set.of(source), 1, 1), + new VertexInfo(sink, Set.of(op), 1, 1)); + + Map<ScalingMetric, Double> scalingMetrics = new HashMap<>(); + ScalingMetrics.computeDataRateMetrics( + source, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 100., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1000.), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.)), + scalingMetrics, + topology, + Optional.of(15.), + new Configuration()); + + assertEquals( + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + 10000., + ScalingMetric.TRUE_OUTPUT_RATE, + 20000., + ScalingMetric.OUTPUT_RATIO, + 2., + ScalingMetric.SOURCE_DATA_RATE, + 1015.), + scalingMetrics); + + // test negative lag growth (catch up) + scalingMetrics.clear(); + ScalingMetrics.computeDataRateMetrics( + source, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 100., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1000.), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.)), + scalingMetrics, + topology, + Optional.of(-50.), + new Configuration()); + + assertEquals( + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + 10000., + ScalingMetric.TRUE_OUTPUT_RATE, + 20000., + ScalingMetric.OUTPUT_RATIO, + 2., + ScalingMetric.SOURCE_DATA_RATE, + 950.), + scalingMetrics); + + scalingMetrics.clear(); + ScalingMetrics.computeDataRateMetrics( + op, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 100., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1000.), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.)), + scalingMetrics, + topology, + Optional.empty(), + new Configuration()); + + assertEquals( + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + 10000., + ScalingMetric.TRUE_OUTPUT_RATE, + 20000., + ScalingMetric.OUTPUT_RATIO, + 2.), + scalingMetrics); + } + + @Test + public void testSourceScalingDisabled() { + var source = new JobVertexID(); + + var topology = new JobTopology(new VertexInfo(source, Collections.emptySet(), 1, 1)); + + Configuration conf = new Configuration(); + // Disable scaling sources + conf.setBoolean(AutoScalerOptions.SOURCE_SCALING_ENABLED, false); + + Map<ScalingMetric, Double> scalingMetrics = new HashMap<>(); + ScalingMetrics.computeDataRateMetrics( + source, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN), + FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 4000.)), + scalingMetrics, + topology, + Optional.empty(), + conf); + + // Sources are not scaled, the rates are solely computed on the basis of the true output + // rate + assertEquals(Double.NaN, scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE)); + assertEquals(8000, scalingMetrics.get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals(8000, scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE)); + assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO)); + } + + @Test + public void testLoadMetrics() { + Map<ScalingMetric, Double> scalingMetrics = new HashMap<>(); + ScalingMetrics.computeLoadMetrics( + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, 200., 100., Double.NaN)), + scalingMetrics); + + assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX)); + assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG)); + } +}
