This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 8d285fe1397c24d740ac7baa85e9bc22eb03fff6
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Dec 1 08:47:23 2022 +0100

    [FLINK-30260][autoscaler] Define scaling metrics
---
 .../autoscaler/metrics/CollectedMetrics.java       |  34 ++++
 .../autoscaler/metrics/EvaluatedScalingMetric.java |  36 +++++
 .../operator/autoscaler/metrics/ScalingMetric.java |  74 +++++++++
 .../autoscaler/metrics/ScalingMetrics.java         | 159 ++++++++++++++++++
 .../autoscaler/metrics/ScalingMetricsTest.java     | 178 +++++++++++++++++++++
 5 files changed, 481 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
new file mode 100644
index 00000000..60f1c410
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.SortedMap;
+
+/** Topology and collected metric history. */
+@Value
+public class CollectedMetrics {
+    JobTopology jobTopology;
+    SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricHistory;
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
new file mode 100644
index 00000000..a2304128
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Evaluated scaling metric. */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class EvaluatedScalingMetric {
+    private double current;
+
+    private double average;
+
+    public static EvaluatedScalingMetric of(double value) {
+        return new EvaluatedScalingMetric(value, Double.NaN);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
new file mode 100644
index 00000000..d3103971
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+/**
+ * Supported scaling metrics. These represent high level metrics computed from 
Flink job metrics
+ * that are used for scaling decisions in the autoscaler module.
+ */
+public enum ScalingMetric {
+
+    /** Max subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
+    LOAD_MAX(true),
+
+    /** Average subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). 
*/
+    LOAD_AVG(true),
+
+    /** Processing rate at full capacity (records/sec). */
+    TRUE_PROCESSING_RATE(true),
+
+    /** Output rate at full capacity (records/sec). */
+    TRUE_OUTPUT_RATE(true),
+
+    /**
+     * Incoming data rate to the source, e.g. rate of records written to the 
Kafka topic
+     * (records/sec).
+     */
+    SOURCE_DATA_RATE(true),
+
+    /** Target processing rate of operators as derived from source inputs 
(records/sec). */
+    TARGET_DATA_RATE(true),
+
+    /** Target processing rate of operators as derived from backlog 
(records/sec). */
+    CATCH_UP_DATA_RATE(false),
+
+    /** Number of outputs produced on average for every input record. */
+    OUTPUT_RATIO(true),
+
+    /** Total number of pending records. */
+    LAG(false),
+    /** Job vertex parallelism. */
+    PARALLELISM(false),
+    /** Job vertex max parallelism. */
+    MAX_PARALLELISM(false),
+    /** Upper boundary of the target data rate range. */
+    SCALE_UP_RATE_THRESHOLD(false),
+
+    /** Lower boundary of the target data rate range. */
+    SCALE_DOWN_RATE_THRESHOLD(false);
+
+    private final boolean calculateAverage;
+
+    ScalingMetric(boolean calculateAverage) {
+        this.calculateAverage = calculateAverage;
+    }
+
+    public boolean isCalculateAverage() {
+        return calculateAverage;
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
new file mode 100644
index 00000000..27b3d97d
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
+
+/** Utilities for computing scaling metrics based on Flink metrics. */
+public class ScalingMetrics {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetrics.class);
+
+    public static void computeLoadMetrics(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics) {
+
+        var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
+        if (busyTime == null) {
+            return;
+        }
+
+        if (!busyTime.getAvg().isNaN()) {
+            scalingMetrics.put(ScalingMetric.LOAD_AVG, busyTime.getAvg() / 
1000);
+        }
+
+        if (!busyTime.getMax().isNaN()) {
+            scalingMetrics.put(ScalingMetric.LOAD_MAX, busyTime.getMax() / 
1000);
+        }
+    }
+
+    public static void computeDataRateMetrics(
+            JobVertexID jobVertexID,
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics,
+            JobTopology topology,
+            Optional<Double> lagGrowthOpt,
+            Configuration conf) {
+
+        var source = topology.getInputs().get(jobVertexID).isEmpty();
+        var sink = topology.getOutputs().get(jobVertexID).isEmpty();
+
+        var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
+
+        if (busyTime == null || busyTime.getAvg().isNaN()) {
+            LOG.error("Cannot compute true processing/output rate without 
busyTimeMsPerSecond");
+            return;
+        }
+
+        var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+        if (numRecordsInPerSecond == null) {
+            numRecordsInPerSecond =
+                    
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
+        }
+
+        var outputPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+
+        double busyTimeMultiplier = 1000 / busyTime.getAvg();
+
+        if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
+            double sourceInputRate =
+                    numRecordsInPerSecond != null ? 
numRecordsInPerSecond.getSum() : Double.NaN;
+
+            double targetDataRate;
+            if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) {
+                targetDataRate = sourceInputRate;
+            } else {
+                // If source in metric is not available (maybe legacy source) 
we use source
+                // output that should always be available
+                targetDataRate =
+                        
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
+            }
+            scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
+            scalingMetrics.put(
+                    ScalingMetric.OUTPUT_RATIO, outputPerSecond.getSum() / 
targetDataRate);
+            var trueOutputRate = busyTimeMultiplier * outputPerSecond.getSum();
+            scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate);
+            scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate);
+            LOG.info(
+                    "Scaling disabled for source {} using output rate {} as 
target",
+                    jobVertexID,
+                    trueOutputRate);
+        } else {
+            if (source) {
+                if (!lagGrowthOpt.isPresent() || 
numRecordsInPerSecond.getSum().isNaN()) {
+                    LOG.error(
+                            "Cannot compute source target data rate without 
numRecordsInPerSecond and pendingRecords (lag) metric for {}.",
+                            jobVertexID);
+                    scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, 
Double.NaN);
+                } else {
+                    double sourceDataRate =
+                            Math.max(0, numRecordsInPerSecond.getSum() + 
lagGrowthOpt.get());
+                    LOG.info(
+                            "Using computed source data rate {} for {}",
+                            sourceDataRate,
+                            jobVertexID);
+                    scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE, 
sourceDataRate);
+                }
+            }
+
+            if (!numRecordsInPerSecond.getSum().isNaN()) {
+                double trueProcessingRate = busyTimeMultiplier * 
numRecordsInPerSecond.getSum();
+                if (trueProcessingRate <= 0 || 
!Double.isFinite(trueProcessingRate)) {
+                    trueProcessingRate = Double.NaN;
+                }
+                scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, 
trueProcessingRate);
+            } else {
+                LOG.error("Cannot compute true processing rate without 
numRecordsInPerSecond");
+            }
+
+            if (!sink) {
+                if (!outputPerSecond.getSum().isNaN()) {
+                    scalingMetrics.put(
+                            ScalingMetric.OUTPUT_RATIO,
+                            outputPerSecond.getSum() / 
numRecordsInPerSecond.getSum());
+                    scalingMetrics.put(
+                            ScalingMetric.TRUE_OUTPUT_RATE,
+                            busyTimeMultiplier * outputPerSecond.getSum());
+                } else {
+                    LOG.error(
+                            "Cannot compute processing and input rate without 
numRecordsOutPerSecond");
+                }
+            }
+        }
+    }
+
+    public static void computeLagMetrics(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics) {
+        var pendingRecords = flinkMetrics.get(FlinkMetric.PENDING_RECORDS);
+        if (pendingRecords != null) {
+            scalingMetrics.put(ScalingMetric.LAG, pendingRecords.getSum());
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
new file mode 100644
index 00000000..5aa84b1c
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for scaling metrics computation logic. */
+public class ScalingMetricsTest {
+
+    @Test
+    public void testProcessingAndOutputMetrics() {
+        var source = new JobVertexID();
+        var op = new JobVertexID();
+        var sink = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(op, Set.of(source), 1, 1),
+                        new VertexInfo(sink, Set.of(op), 1, 1));
+
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.of(15.),
+                new Configuration());
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        1015.),
+                scalingMetrics);
+
+        // test negative lag growth (catch up)
+        scalingMetrics.clear();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.of(-50.),
+                new Configuration());
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        950.),
+                scalingMetrics);
+
+        scalingMetrics.clear();
+        ScalingMetrics.computeDataRateMetrics(
+                op,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.empty(),
+                new Configuration());
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.),
+                scalingMetrics);
+    }
+
+    @Test
+    public void testSourceScalingDisabled() {
+        var source = new JobVertexID();
+
+        var topology = new JobTopology(new VertexInfo(source, 
Collections.emptySet(), 1, 1));
+
+        Configuration conf = new Configuration();
+        // Disable scaling sources
+        conf.setBoolean(AutoScalerOptions.SOURCE_SCALING_ENABLED, false);
+
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 500., 
Double.NaN),
+                        FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 4000.)),
+                scalingMetrics,
+                topology,
+                Optional.empty(),
+                conf);
+
+        // Sources are not scaled, the rates are solely computed on the basis 
of the true output
+        // rate
+        assertEquals(Double.NaN, 
scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
+        assertEquals(8000, scalingMetrics.get(ScalingMetric.TARGET_DATA_RATE));
+        assertEquals(8000, scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
+        assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO));
+    }
+
+    @Test
+    public void testLoadMetrics() {
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeLoadMetrics(
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, 200., 100., 
Double.NaN)),
+                scalingMetrics);
+
+        assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX));
+        assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG));
+    }
+}

Reply via email to