This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit d535333aae312fdd3ea867743d6acfa006a95048
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Dec 1 08:48:51 2022 +0100

    [FLINK-30260][autoscaler] Collect and compute scaling metrics through Flink 
REST API
---
 .../autoscaler/RestApiMetricsCollector.java        | 100 +++++
 .../autoscaler/ScalingMetricCollector.java         | 443 +++++++++++++++++++++
 .../operator/autoscaler/metrics/FlinkMetric.java   |  49 +++
 .../rest/messages/job/metrics/IOMetricsInfo.java   | 191 +++++++++
 .../MetricsCollectionAndEvaluationTest.java        | 251 ++++++++++++
 .../autoscaler/TestingMetricsCollector.java        |  79 ++++
 6 files changed, 1113 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
new file mode 100644
index 00000000..9c359502
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Metric collector using flink rest api. */
+public class RestApiMetricsCollector extends ScalingMetricCollector {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RestApiMetricsCollector.class);
+
+    @Override
+    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
queryAllAggregatedMetrics(
+            AbstractFlinkResource<?, ?> cr,
+            FlinkService flinkService,
+            Configuration conf,
+            Map<JobVertexID, Map<String, FlinkMetric>> 
filteredVertexMetricNames) {
+
+        return filteredVertexMetricNames.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                e -> e.getKey(),
+                                e ->
+                                        queryAggregatedVertexMetrics(
+                                                flinkService, cr, conf, 
e.getKey(), e.getValue())));
+    }
+
+    @SneakyThrows
+    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            JobVertexID jobVertexID,
+            Map<String, FlinkMetric> metrics) {
+
+        LOG.info("Querying metrics {} for {}", metrics, jobVertexID);
+
+        var jobId = 
JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
+
+        var parameters = new AggregatedSubtaskMetricsParameters();
+        var pathIt = parameters.getPathParameters().iterator();
+
+        ((JobIDPathParameter) pathIt.next()).resolve(jobId);
+        ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);
+
+        parameters
+                .getQueryParameters()
+                .iterator()
+                .next()
+                .resolveFromString(StringUtils.join(metrics.keySet(), ","));
+
+        try (var restClient = (RestClusterClient<String>) 
flinkService.getClusterClient(conf)) {
+
+            var responseBody =
+                    restClient
+                            .sendRequest(
+                                    
AggregatedSubtaskMetricsHeaders.getInstance(),
+                                    parameters,
+                                    EmptyRequestBody.getInstance())
+                            .get();
+
+            return responseBody.getMetrics().stream()
+                    .collect(Collectors.toMap(m -> metrics.get(m.getId()), m 
-> m));
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
new file mode 100644
index 00000000..993e6760
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
+
+/** Metric collector using flink rest api. */
+public abstract class ScalingMetricCollector implements Cleanup {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetricCollector.class);
+
+    private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, 
FlinkMetric>>>>
+            availableVertexMetricNames = new ConcurrentHashMap<>();
+
+    private final Map<ResourceID, SortedMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>>
+            histories = new ConcurrentHashMap<>();
+
+    private final Map<ResourceID, JobTopology> topologies = new 
ConcurrentHashMap<>();
+
+    private Clock clock = Clock.systemDefaultZone();
+
+    public CollectedMetrics getMetricsHistory(
+            AbstractFlinkResource<?, ?> cr,
+            AutoScalerInfo scalingInformation,
+            FlinkService flinkService,
+            Configuration conf)
+            throws Exception {
+
+        var resourceID = ResourceID.fromResource(cr);
+        var currentJobStartTs =
+                
Instant.ofEpochMilli(Long.parseLong(cr.getStatus().getJobStatus().getStartTime()));
+
+        if (!currentJobStartTs.equals(
+                scalingInformation.getJobStartTs().orElse(currentJobStartTs))) 
{
+            scalingInformation.clearMetricHistory();
+            cleanup(cr);
+        }
+
+        // Initialize metric history
+        var scalingMetricHistory =
+                histories.compute(
+                        resourceID,
+                        (k, h) -> {
+                            if (h == null) {
+                                h = scalingInformation.getMetricHistory();
+                            }
+                            return h.tailMap(
+                                    clock.instant()
+                                            
.minus(conf.get(AutoScalerOptions.METRICS_WINDOW)));
+                        });
+
+        var topology = getJobTopology(flinkService, cr, conf);
+
+        // The filtered list of metrics we want to query for each vertex
+        var filteredVertexMetricNames = queryFilteredMetricNames(flinkService, 
cr, conf, topology);
+
+        // Aggregated job vertex metrics collected from Flink based on the 
filtered metric names
+        var collectedVertexMetrics =
+                queryAllAggregatedMetrics(cr, flinkService, conf, 
filteredVertexMetricNames);
+
+        // The computed scaling metrics based on the collected aggregated 
vertex metrics
+        var scalingMetrics =
+                convertToScalingMetrics(resourceID, collectedVertexMetrics, 
topology, conf);
+
+        // Add scaling metrics to history if they were computed successfully
+        scalingMetricHistory.put(clock.instant(), scalingMetrics);
+        scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+
+        return new CollectedMetrics(topology, scalingMetricHistory);
+    }
+
+    protected JobTopology getJobTopology(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> cr, 
Configuration conf)
+            throws Exception {
+
+        try (var restClient = (RestClusterClient<String>) 
flinkService.getClusterClient(conf)) {
+            var jobId = 
JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
+            var topology =
+                    topologies.computeIfAbsent(
+                            ResourceID.fromResource(cr), r -> 
queryJobTopology(restClient, jobId));
+            updateKafkaSourceMaxParallelisms(restClient, jobId, topology);
+            return topology;
+        }
+    }
+
+    @VisibleForTesting
+    protected JobTopology queryJobTopology(RestClusterClient<String> 
restClient, JobID jobId) {
+        try {
+            var jobDetailsInfo = restClient.getJobDetails(jobId).get();
+
+            Map<JobVertexID, Integer> maxParallelismMap =
+                    jobDetailsInfo.getJobVertexInfos().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            
JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
+                                            JobDetailsInfo.JobVertexDetailsInfo
+                                                    ::getMaxParallelism));
+
+            return JobTopology.fromJsonPlan(jobDetailsInfo.getJsonPlan(), 
maxParallelismMap);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void updateKafkaSourceMaxParallelisms(
+            RestClusterClient<String> restClient, JobID jobId, JobTopology 
topology)
+            throws Exception {
+        for (Map.Entry<JobVertexID, Set<JobVertexID>> entry : 
topology.getInputs().entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                var sourceVertex = entry.getKey();
+                queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
+                        .map(AggregatedMetric::getId)
+                        .filter(s -> s.endsWith(".currentOffset"))
+                        .mapToInt(
+                                s -> {
+                                    // We extract the partition from the 
pattern:
+                                    // 
...topic.[topic].partition.3.currentOffset
+                                    var split = s.split("\\.");
+                                    return Integer.parseInt(split[split.length 
- 2]);
+                                })
+                        .max()
+                        .ifPresent(
+                                p -> {
+                                    LOG.info(
+                                            "Updating source {} max 
parallelism based on available partitions to {}",
+                                            sourceVertex,
+                                            p + 1);
+                                    
topology.updateMaxParallelism(sourceVertex, p + 1);
+                                });
+            }
+        }
+    }
+
+    private List<JobVertexID> getVertexList(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> cr, 
Configuration conf)
+            throws Exception {
+        JobTopology topology = getJobTopology(flinkService, cr, conf);
+        return new ArrayList<>(topology.getParallelisms().keySet());
+    }
+
+    /**
+     * Given a map of collected Flink vertex metrics we compute the scaling 
metrics for each job
+     * vertex.
+     *
+     * @param collectedMetrics Collected metrics for all job vertices.
+     * @return Computed scaling metrics for all job vertices.
+     */
+    private Map<JobVertexID, Map<ScalingMetric, Double>> 
convertToScalingMetrics(
+            ResourceID resourceID,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
collectedMetrics,
+            JobTopology jobTopology,
+            Configuration conf) {
+
+        var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
+        collectedMetrics.forEach(
+                (jobVertexID, vertexFlinkMetrics) -> {
+                    LOG.info(
+                            "Calculating vertex scaling metrics for {} from 
{}",
+                            jobVertexID,
+                            vertexFlinkMetrics);
+                    var vertexScalingMetrics = new HashMap<ScalingMetric, 
Double>();
+                    out.put(jobVertexID, vertexScalingMetrics);
+
+                    ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, 
vertexScalingMetrics);
+                    ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics, 
vertexScalingMetrics);
+
+                    Optional<Double> lagGrowthRate =
+                            computeLagGrowthRate(
+                                    resourceID,
+                                    jobVertexID,
+                                    
vertexScalingMetrics.get(ScalingMetric.LAG));
+
+                    ScalingMetrics.computeDataRateMetrics(
+                            jobVertexID,
+                            vertexFlinkMetrics,
+                            vertexScalingMetrics,
+                            jobTopology,
+                            lagGrowthRate,
+                            conf);
+
+                    LOG.info(
+                            "Vertex scaling metrics for {}: {}", jobVertexID, 
vertexScalingMetrics);
+                });
+
+        return out;
+    }
+
+    @NotNull
+    private Optional<Double> computeLagGrowthRate(
+            ResourceID resourceID, JobVertexID jobVertexID, Double currentLag) 
{
+        var metricHistory = histories.get(resourceID);
+
+        if (metricHistory == null || metricHistory.isEmpty()) {
+            return Optional.empty();
+        }
+
+        var lastCollectionTime = metricHistory.lastKey();
+        var lastCollectedMetrics = 
metricHistory.get(lastCollectionTime).get(jobVertexID);
+
+        if (lastCollectedMetrics == null) {
+            return Optional.empty();
+        }
+
+        var lastLag = lastCollectedMetrics.get(ScalingMetric.LAG);
+
+        if (lastLag == null || currentLag == null) {
+            return Optional.empty();
+        }
+
+        var timeDiff = Duration.between(lastCollectionTime, 
clock.instant()).toSeconds();
+        return Optional.of((currentLag - lastLag) / timeDiff);
+    }
+
+    /** Query the available metric names for each job vertex for the current 
spec generation. */
+    @SneakyThrows
+    protected Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            JobTopology topology) {
+
+        var jobId = 
JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
+        var vertices = getVertexList(flinkService, cr, conf);
+
+        long deployedGeneration = getDeployedGeneration(cr);
+
+        var previousMetricNames = 
availableVertexMetricNames.get(ResourceID.fromResource(cr));
+
+        if (previousMetricNames != null) {
+            if (deployedGeneration == previousMetricNames.f0) {
+                // We have already gathered the metric names for this spec, no 
need to query again
+                return previousMetricNames.f1;
+            } else {
+                availableVertexMetricNames.remove(ResourceID.fromResource(cr));
+            }
+        }
+
+        try (var restClient = (RestClusterClient<String>) 
flinkService.getClusterClient(conf)) {
+            var names =
+                    vertices.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            v -> v,
+                                            v ->
+                                                    
getFilteredVertexMetricNames(
+                                                            restClient, jobId, 
v, topology, conf)));
+            availableVertexMetricNames.put(
+                    ResourceID.fromResource(cr), Tuple2.of(deployedGeneration, 
names));
+            return names;
+        }
+    }
+
+    public static long getDeployedGeneration(AbstractFlinkResource<?, ?> cr) {
+        return cr.getStatus()
+                .getReconciliationStatus()
+                .deserializeLastReconciledSpecWithMeta()
+                .getMeta()
+                .getMetadata()
+                .getGeneration();
+    }
+
+    /**
+     * Query and filter metric names for a given job vertex.
+     *
+     * @param restClient Flink rest client.
+     * @param jobID Job Id.
+     * @param jobVertexID Job Vertex Id.
+     * @return Map of filtered metric names.
+     */
+    @SneakyThrows
+    protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
+            RestClusterClient<?> restClient,
+            JobID jobID,
+            JobVertexID jobVertexID,
+            JobTopology topology,
+            Configuration conf) {
+
+        var allMetricNames = queryAggregatedMetricNames(restClient, jobID, 
jobVertexID);
+
+        var filteredMetrics = new HashMap<String, FlinkMetric>();
+        var requiredMetrics = new HashSet<FlinkMetric>();
+
+        requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);
+
+        if (topology.isSource(jobVertexID)) {
+            
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
+            if (conf.getBoolean(SOURCE_SCALING_ENABLED)) {
+                requiredMetrics.add(FlinkMetric.PENDING_RECORDS);
+            } else {
+                FlinkMetric.PENDING_RECORDS
+                        .findAny(allMetricNames)
+                        .ifPresent(m -> filteredMetrics.put(m, 
FlinkMetric.PENDING_RECORDS));
+                FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
+                        .findAny(allMetricNames)
+                        .ifPresent(
+                                m ->
+                                        filteredMetrics.put(
+                                                m,
+                                                
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
+            }
+        } else {
+            // Not a source so we must have numRecordsInPerSecond
+            requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+        }
+
+        if (!topology.getOutputs().get(jobVertexID).isEmpty()) {
+            // Not a sink so we must have numRecordsOutPerSecond
+            requiredMetrics.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+        }
+
+        requiredMetrics.forEach(
+                flinkMetric -> {
+                    filteredMetrics.put(
+                            flinkMetric
+                                    .findAny(allMetricNames)
+                                    .orElseThrow(
+                                            () ->
+                                                    new RuntimeException(
+                                                            "Could not find 
required metric "
+                                                                    + 
flinkMetric.name()
+                                                                    + " for "
+                                                                    + 
jobVertexID)),
+                            flinkMetric);
+                });
+
+        return filteredMetrics;
+    }
+
+    @VisibleForTesting
+    protected Collection<AggregatedMetric> queryAggregatedMetricNames(
+            RestClusterClient<?> restClient, JobID jobID, JobVertexID 
jobVertexID)
+            throws Exception {
+        var parameters = new AggregatedSubtaskMetricsParameters();
+        var pathIt = parameters.getPathParameters().iterator();
+
+        ((JobIDPathParameter) pathIt.next()).resolve(jobID);
+        ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);
+
+        return restClient
+                .sendRequest(
+                        AggregatedSubtaskMetricsHeaders.getInstance(),
+                        parameters,
+                        EmptyRequestBody.getInstance())
+                .get()
+                .getMetrics();
+    }
+
+    protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
+            queryAllAggregatedMetrics(
+                    AbstractFlinkResource<?, ?> cr,
+                    FlinkService flinkService,
+                    Configuration conf,
+                    Map<JobVertexID, Map<String, FlinkMetric>> 
filteredVertexMetricNames);
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {
+        LOG.info("Scaling metric cleanup");
+        var resourceId = ResourceID.fromResource(cr);
+        histories.remove(resourceId);
+        availableVertexMetricNames.remove(resourceId);
+        topologies.remove(resourceId);
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, 
FlinkMetric>>>>
+            getAvailableVertexMetricNames() {
+        return availableVertexMetricNames;
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceID, SortedMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>>
+            getHistories() {
+        return histories;
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceID, JobTopology> getTopologies() {
+        return topologies;
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
new file mode 100644
index 00000000..76ba7a98
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/**
+ * Enum representing the collected Flink metrics for autoscaling. The actual 
metric names depend on
+ * the JobGraph.
+ */
+public enum FlinkMetric {
+    BUSY_TIME_PER_SEC(s -> s.equals("busyTimeMsPerSecond")),
+    NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
+    NUM_RECORDS_OUT_PER_SEC(s -> s.equals("numRecordsOutPerSecond")),
+    SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC(
+            s -> s.startsWith("Source__") && 
s.endsWith(".numRecordsOutPerSecond")),
+    SOURCE_TASK_NUM_RECORDS_IN_PER_SEC(
+            s -> s.startsWith("Source__") && 
s.endsWith(".numRecordsInPerSecond")),
+    PENDING_RECORDS(s -> s.endsWith(".pendingRecords"));
+
+    FlinkMetric(Predicate<String> predicate) {
+        this.predicate = predicate;
+    }
+
+    public final Predicate<String> predicate;
+
+    public Optional<String> findAny(Collection<AggregatedMetric> metrics) {
+        return 
metrics.stream().map(AggregatedMetric::getId).filter(predicate).findAny();
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
new file mode 100644
index 00000000..accecbe3
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/** IO metrics information. */
+public final class IOMetricsInfo {
+
+    public static final String FIELD_NAME_BYTES_READ = "read-bytes";
+
+    private static final String FIELD_NAME_BYTES_READ_COMPLETE = 
"read-bytes-complete";
+
+    public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes";
+
+    private static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = 
"write-bytes-complete";
+
+    public static final String FIELD_NAME_RECORDS_READ = "read-records";
+
+    private static final String FIELD_NAME_RECORDS_READ_COMPLETE = 
"read-records-complete";
+
+    public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records";
+
+    private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = 
"write-records-complete";
+
+    public static final String FIELD_NAME_ACC_BACK_PRESSURE = 
"accumulated-backpressured-time";
+
+    public static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time";
+
+    public static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time";
+
+    @JsonProperty(FIELD_NAME_BYTES_READ)
+    private final long bytesRead;
+
+    @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE)
+    private final boolean bytesReadComplete;
+
+    @JsonProperty(FIELD_NAME_BYTES_WRITTEN)
+    private final long bytesWritten;
+
+    @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE)
+    private final boolean bytesWrittenComplete;
+
+    @JsonProperty(FIELD_NAME_RECORDS_READ)
+    private final long recordsRead;
+
+    @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE)
+    private final boolean recordsReadComplete;
+
+    @JsonProperty(FIELD_NAME_RECORDS_WRITTEN)
+    private final long recordsWritten;
+
+    @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
+    private final boolean recordsWrittenComplete;
+
+    @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE)
+    private final Long accumulatedBackpressured;
+
+    @JsonProperty(FIELD_NAME_ACC_IDLE)
+    private final Long accumulatedIdle;
+
+    @JsonProperty(FIELD_NAME_ACC_BUSY)
+    private final Double accumulatedBusy;
+
+    @JsonCreator
+    public IOMetricsInfo(
+            @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
+            @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean 
bytesReadComplete,
+            @JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten,
+            @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean 
bytesWrittenComplete,
+            @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
+            @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean 
recordsReadComplete,
+            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
+            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean 
recordsWrittenComplete,
+            @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) Long 
accumulatedBackpressured,
+            @JsonProperty(FIELD_NAME_ACC_IDLE) Long accumulatedIdle,
+            @JsonProperty(FIELD_NAME_ACC_BUSY) Double accumulatedBusy) {
+        this.bytesRead = bytesRead;
+        this.bytesReadComplete = bytesReadComplete;
+        this.bytesWritten = bytesWritten;
+        this.bytesWrittenComplete = bytesWrittenComplete;
+        this.recordsRead = recordsRead;
+        this.recordsReadComplete = recordsReadComplete;
+        this.recordsWritten = recordsWritten;
+        this.recordsWrittenComplete = recordsWrittenComplete;
+        this.accumulatedBackpressured = accumulatedBackpressured;
+        this.accumulatedIdle = accumulatedIdle;
+        this.accumulatedBusy = accumulatedBusy;
+    }
+
+    public long getBytesRead() {
+        return bytesRead;
+    }
+
+    public boolean isBytesReadComplete() {
+        return bytesReadComplete;
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    public boolean isBytesWrittenComplete() {
+        return bytesWrittenComplete;
+    }
+
+    public long getRecordsRead() {
+        return recordsRead;
+    }
+
+    public boolean isRecordsReadComplete() {
+        return recordsReadComplete;
+    }
+
+    public long getRecordsWritten() {
+        return recordsWritten;
+    }
+
+    public boolean isRecordsWrittenComplete() {
+        return recordsWrittenComplete;
+    }
+
+    public long getAccumulatedBackpressured() {
+        return accumulatedBackpressured;
+    }
+
+    public double getAccumulatedBusy() {
+        return accumulatedBusy;
+    }
+
+    public long getAccumulatedIdle() {
+        return accumulatedIdle;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IOMetricsInfo that = (IOMetricsInfo) o;
+        return bytesRead == that.bytesRead
+                && bytesReadComplete == that.bytesReadComplete
+                && bytesWritten == that.bytesWritten
+                && bytesWrittenComplete == that.bytesWrittenComplete
+                && recordsRead == that.recordsRead
+                && recordsReadComplete == that.recordsReadComplete
+                && recordsWritten == that.recordsWritten
+                && recordsWrittenComplete == that.recordsWrittenComplete
+                && accumulatedBackpressured == that.accumulatedBackpressured
+                && accumulatedBusy == that.accumulatedBusy
+                && accumulatedIdle == that.accumulatedIdle;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bytesRead,
+                bytesReadComplete,
+                bytesWritten,
+                bytesWrittenComplete,
+                recordsRead,
+                recordsReadComplete,
+                recordsWritten,
+                recordsWrittenComplete,
+                accumulatedBackpressured,
+                accumulatedBusy,
+                accumulatedIdle);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
new file mode 100644
index 00000000..11fc16a9
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class MetricsCollectionAndEvaluationTest {
+
+    private ScalingMetricEvaluator evaluator;
+    private TestingFlinkService service;
+    private TestingMetricsCollector metricsCollector;
+    private ScalingExecutor scalingExecutor;
+
+    private FlinkDeployment app;
+    private Configuration conf;
+    private JobVertexID source1, source2, map, sink;
+    private JobTopology topology;
+
+    private KubernetesClient kubernetesClient;
+
+    @BeforeEach
+    public void setup() {
+        evaluator = new ScalingMetricEvaluator();
+        scalingExecutor = new ScalingExecutor(kubernetesClient);
+        service = new TestingFlinkService();
+
+        app = TestUtils.buildApplicationCluster();
+        app.getMetadata().setGeneration(1L);
+        app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+        kubernetesClient.resource(app).createOrReplace();
+
+        source1 = new JobVertexID();
+        source2 = new JobVertexID();
+        map = new JobVertexID();
+        sink = new JobVertexID();
+
+        topology =
+                new JobTopology(
+                        new VertexInfo(source1, Set.of(), 2, 720),
+                        new VertexInfo(source2, Set.of(), 2, 720),
+                        new VertexInfo(map, Set.of(source1, source2), 12, 720),
+                        new VertexInfo(sink, Set.of(map), 8, 24));
+
+        metricsCollector = new TestingMetricsCollector(topology);
+
+        var confManager = new FlinkConfigManager(new Configuration());
+        conf = confManager.getDeployConfig(app.getMetadata(), app.getSpec());
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        ReconciliationUtils.updateStatusForDeployedSpec(app, conf);
+        
app.getStatus().getJobStatus().setStartTime(String.valueOf(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testEndToEnd() throws Exception {
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+
+        setDefaultMetrics();
+        metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+
+        // Test resetting the collector and make sure we can deserialize the 
scalingInfo correctly
+        metricsCollector = new TestingMetricsCollector(topology);
+        setDefaultMetrics();
+
+        var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), 
ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+
+        var collectedMetrics = metricsCollector.getMetricsHistory(app, 
scalingInfo, service, conf);
+
+        var evaluation = evaluator.evaluate(conf, collectedMetrics);
+        scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+
+        var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(source2));
+        assertEquals(6, scaledParallelism.get(map));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        evaluation = evaluator.evaluate(conf, collectedMetrics);
+        scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(source2));
+        assertEquals(12, scaledParallelism.get(map));
+        assertEquals(8, scaledParallelism.get(sink));
+
+        var resourceID = ResourceID.fromResource(app);
+        assertNotNull(metricsCollector.getHistories().get(resourceID));
+        assertNotNull(metricsCollector.getTopologies().get(resourceID));
+
+        metricsCollector.cleanup(app);
+        scalingExecutor.cleanup(app);
+        assertNull(metricsCollector.getHistories().get(resourceID));
+        
assertNull(metricsCollector.getAvailableVertexMetricNames().get(resourceID));
+        assertNull(metricsCollector.getTopologies().get(resourceID));
+        assertNull(metricsCollector.getTopologies().get(resourceID));
+    }
+
+    private void setDefaultMetrics() {
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 0.)),
+                        source2,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 0.)),
+                        map,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 2000.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, 
Double.NaN, 1000.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, 
Double.NaN, 2000.))));
+    }
+
+    @Test
+    public void testKafkaPartitionMaxParallelism() throws Exception {
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+
+        setDefaultMetrics();
+        metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+
+        var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), 
ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+
+        var collectedMetrics = metricsCollector.getMetricsHistory(app, 
scalingInfo, service, conf);
+
+        assertEquals(720, 
collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
+        assertEquals(720, 
collectedMetrics.getJobTopology().getMaxParallelisms().get(source2));
+
+        clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), 
ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+
+        metricsCollector.setMetricNames(
+                Map.of(
+                        source1,
+                        List.of(
+                                new AggregatedMetric(
+                                        
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.currentOffset"),
+                                new AggregatedMetric(
+                                        
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.1.currentOffset"),
+                                new AggregatedMetric(
+                                        
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.2.currentOffset"),
+                                new AggregatedMetric(
+                                        
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))));
+
+        collectedMetrics = metricsCollector.getMetricsHistory(app, 
scalingInfo, service, conf);
+        assertEquals(4, 
collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
+        assertEquals(720, 
collectedMetrics.getJobTopology().getMaxParallelisms().get(source2));
+    }
+
+    @Test
+    public void testJobDetailsRestCompatibility() throws 
JsonProcessingException {
+        String flink15Response =
+                
"{\"jid\":\"068d4a00e4592099e94bb7a45f5bbd95\",\"name\":\"State machine 
job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487397898,\"end-time\":-1,\"duration\":82350,\"maxParallelism\":-1,\"now\":1667487480248,\"timestamps\":{\"RUNNING\":1667487398514,\"FAILING\":0,\"CANCELLING\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"SUSPENDED\":0,\"INITIALIZING\":1667487397898,\"CANCELED\":0,\"RECONCILING\":0,\"CREATED\":1667487398210},\"vertices\":[{\"id\"
 [...]
+        String flink16Response =
+                
"{\"jid\":\"2667c218edfecda90ba9b4b23e8e14e1\",\"name\":\"State machine 
job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487688693,\"end-time\":-1,\"duration\":36646,\"maxParallelism\":-1,\"now\":1667487725339,\"timestamps\":{\"RESTARTING\":0,\"RECONCILING\":0,\"INITIALIZING\":1667487688693,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0,\"RUNNING\":1667487689116,\"FAILING\":0,\"FINISHED\":0,\"CREATED\":1667487688912,\"CANCELLING\":0},\"vertices\":[{\"id\"
 [...]
+
+        var flinkObjectMapper = new ObjectMapper();
+        flinkObjectMapper.readValue(flink15Response, JobDetailsInfo.class);
+        flinkObjectMapper.readValue(flink16Response, JobDetailsInfo.class);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
new file mode 100644
index 00000000..c487e116
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import lombok.Setter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Testing {@link ScalingMetricCollector} implementation. */
+public class TestingMetricsCollector extends ScalingMetricCollector {
+
+    @Setter private JobTopology jobTopology;
+
+    @Setter
+    private Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
currentMetrics = new HashMap<>();
+
+    @Setter private Map<JobVertexID, Collection<AggregatedMetric>> metricNames 
= new HashMap<>();
+
+    public TestingMetricsCollector(JobTopology jobTopology) {
+        this.jobTopology = jobTopology;
+    }
+
+    @Override
+    protected JobTopology queryJobTopology(RestClusterClient<String> 
restClient, JobID jobId) {
+        return jobTopology;
+    }
+
+    @Override
+    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
queryAllAggregatedMetrics(
+            AbstractFlinkResource<?, ?> cr,
+            FlinkService flinkService,
+            Configuration conf,
+            Map<JobVertexID, Map<String, FlinkMetric>> 
filteredVertexMetricNames) {
+        return currentMetrics;
+    }
+
+    @Override
+    protected Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            JobTopology topology) {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    protected Collection<AggregatedMetric> queryAggregatedMetricNames(
+            RestClusterClient<?> restClient, JobID jobID, JobVertexID 
jobVertexID) {
+        return metricNames.getOrDefault(jobVertexID, Collections.emptyList());
+    }
+}

Reply via email to