This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit d535333aae312fdd3ea867743d6acfa006a95048 Author: Gyula Fora <[email protected]> AuthorDate: Thu Dec 1 08:48:51 2022 +0100 [FLINK-30260][autoscaler] Collect and compute scaling metrics through Flink REST API --- .../autoscaler/RestApiMetricsCollector.java | 100 +++++ .../autoscaler/ScalingMetricCollector.java | 443 +++++++++++++++++++++ .../operator/autoscaler/metrics/FlinkMetric.java | 49 +++ .../rest/messages/job/metrics/IOMetricsInfo.java | 191 +++++++++ .../MetricsCollectionAndEvaluationTest.java | 251 ++++++++++++ .../autoscaler/TestingMetricsCollector.java | 79 ++++ 6 files changed, 1113 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java new file mode 100644 index 00000000..9c359502 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; + +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.stream.Collectors; + +/** Metric collector using flink rest api. */ +public class RestApiMetricsCollector extends ScalingMetricCollector { + private static final Logger LOG = LoggerFactory.getLogger(RestApiMetricsCollector.class); + + @Override + protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics( + AbstractFlinkResource<?, ?> cr, + FlinkService flinkService, + Configuration conf, + Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames) { + + return filteredVertexMetricNames.entrySet().stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> + queryAggregatedVertexMetrics( + flinkService, cr, conf, e.getKey(), e.getValue()))); + } + + @SneakyThrows + protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics( + FlinkService flinkService, + AbstractFlinkResource<?, ?> cr, + Configuration conf, + JobVertexID jobVertexID, + Map<String, FlinkMetric> metrics) { + + LOG.info("Querying metrics {} for {}", metrics, jobVertexID); + + var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId()); + + var parameters = new AggregatedSubtaskMetricsParameters(); + var pathIt = parameters.getPathParameters().iterator(); + + ((JobIDPathParameter) pathIt.next()).resolve(jobId); + ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID); + + parameters + .getQueryParameters() + .iterator() + .next() + .resolveFromString(StringUtils.join(metrics.keySet(), ",")); + + try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) { + + var responseBody = + restClient + .sendRequest( + AggregatedSubtaskMetricsHeaders.getInstance(), + parameters, + EmptyRequestBody.getInstance()) + .get(); + + return responseBody.getMetrics().stream() + .collect(Collectors.toMap(m -> metrics.get(m.getId()), m -> m)); + } + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java new file mode 100644 index 00000000..993e6760 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.util.Preconditions; + +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import lombok.SneakyThrows; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED; + +/** Metric collector using flink rest api. */ +public abstract class ScalingMetricCollector implements Cleanup { + private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class); + + private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, FlinkMetric>>>> + availableVertexMetricNames = new ConcurrentHashMap<>(); + + private final Map<ResourceID, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>> + histories = new ConcurrentHashMap<>(); + + private final Map<ResourceID, JobTopology> topologies = new ConcurrentHashMap<>(); + + private Clock clock = Clock.systemDefaultZone(); + + public CollectedMetrics getMetricsHistory( + AbstractFlinkResource<?, ?> cr, + AutoScalerInfo scalingInformation, + FlinkService flinkService, + Configuration conf) + throws Exception { + + var resourceID = ResourceID.fromResource(cr); + var currentJobStartTs = + Instant.ofEpochMilli(Long.parseLong(cr.getStatus().getJobStatus().getStartTime())); + + if (!currentJobStartTs.equals( + scalingInformation.getJobStartTs().orElse(currentJobStartTs))) { + scalingInformation.clearMetricHistory(); + cleanup(cr); + } + + // Initialize metric history + var scalingMetricHistory = + histories.compute( + resourceID, + (k, h) -> { + if (h == null) { + h = scalingInformation.getMetricHistory(); + } + return h.tailMap( + clock.instant() + .minus(conf.get(AutoScalerOptions.METRICS_WINDOW))); + }); + + var topology = getJobTopology(flinkService, cr, conf); + + // The filtered list of metrics we want to query for each vertex + var filteredVertexMetricNames = queryFilteredMetricNames(flinkService, cr, conf, topology); + + // Aggregated job vertex metrics collected from Flink based on the filtered metric names + var collectedVertexMetrics = + queryAllAggregatedMetrics(cr, flinkService, conf, filteredVertexMetricNames); + + // The computed scaling metrics based on the collected aggregated vertex metrics + var scalingMetrics = + convertToScalingMetrics(resourceID, collectedVertexMetrics, topology, conf); + + // Add scaling metrics to history if they were computed successfully + scalingMetricHistory.put(clock.instant(), scalingMetrics); + scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); + + return new CollectedMetrics(topology, scalingMetricHistory); + } + + protected JobTopology getJobTopology( + FlinkService flinkService, AbstractFlinkResource<?, ?> cr, Configuration conf) + throws Exception { + + try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) { + var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId()); + var topology = + topologies.computeIfAbsent( + ResourceID.fromResource(cr), r -> queryJobTopology(restClient, jobId)); + updateKafkaSourceMaxParallelisms(restClient, jobId, topology); + return topology; + } + } + + @VisibleForTesting + protected JobTopology queryJobTopology(RestClusterClient<String> restClient, JobID jobId) { + try { + var jobDetailsInfo = restClient.getJobDetails(jobId).get(); + + Map<JobVertexID, Integer> maxParallelismMap = + jobDetailsInfo.getJobVertexInfos().stream() + .collect( + Collectors.toMap( + JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID, + JobDetailsInfo.JobVertexDetailsInfo + ::getMaxParallelism)); + + return JobTopology.fromJsonPlan(jobDetailsInfo.getJsonPlan(), maxParallelismMap); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void updateKafkaSourceMaxParallelisms( + RestClusterClient<String> restClient, JobID jobId, JobTopology topology) + throws Exception { + for (Map.Entry<JobVertexID, Set<JobVertexID>> entry : topology.getInputs().entrySet()) { + if (entry.getValue().isEmpty()) { + var sourceVertex = entry.getKey(); + queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() + .map(AggregatedMetric::getId) + .filter(s -> s.endsWith(".currentOffset")) + .mapToInt( + s -> { + // We extract the partition from the pattern: + // ...topic.[topic].partition.3.currentOffset + var split = s.split("\\."); + return Integer.parseInt(split[split.length - 2]); + }) + .max() + .ifPresent( + p -> { + LOG.info( + "Updating source {} max parallelism based on available partitions to {}", + sourceVertex, + p + 1); + topology.updateMaxParallelism(sourceVertex, p + 1); + }); + } + } + } + + private List<JobVertexID> getVertexList( + FlinkService flinkService, AbstractFlinkResource<?, ?> cr, Configuration conf) + throws Exception { + JobTopology topology = getJobTopology(flinkService, cr, conf); + return new ArrayList<>(topology.getParallelisms().keySet()); + } + + /** + * Given a map of collected Flink vertex metrics we compute the scaling metrics for each job + * vertex. + * + * @param collectedMetrics Collected metrics for all job vertices. + * @return Computed scaling metrics for all job vertices. + */ + private Map<JobVertexID, Map<ScalingMetric, Double>> convertToScalingMetrics( + ResourceID resourceID, + Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics, + JobTopology jobTopology, + Configuration conf) { + + var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>(); + collectedMetrics.forEach( + (jobVertexID, vertexFlinkMetrics) -> { + LOG.info( + "Calculating vertex scaling metrics for {} from {}", + jobVertexID, + vertexFlinkMetrics); + var vertexScalingMetrics = new HashMap<ScalingMetric, Double>(); + out.put(jobVertexID, vertexScalingMetrics); + + ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics); + ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics, vertexScalingMetrics); + + Optional<Double> lagGrowthRate = + computeLagGrowthRate( + resourceID, + jobVertexID, + vertexScalingMetrics.get(ScalingMetric.LAG)); + + ScalingMetrics.computeDataRateMetrics( + jobVertexID, + vertexFlinkMetrics, + vertexScalingMetrics, + jobTopology, + lagGrowthRate, + conf); + + LOG.info( + "Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics); + }); + + return out; + } + + @NotNull + private Optional<Double> computeLagGrowthRate( + ResourceID resourceID, JobVertexID jobVertexID, Double currentLag) { + var metricHistory = histories.get(resourceID); + + if (metricHistory == null || metricHistory.isEmpty()) { + return Optional.empty(); + } + + var lastCollectionTime = metricHistory.lastKey(); + var lastCollectedMetrics = metricHistory.get(lastCollectionTime).get(jobVertexID); + + if (lastCollectedMetrics == null) { + return Optional.empty(); + } + + var lastLag = lastCollectedMetrics.get(ScalingMetric.LAG); + + if (lastLag == null || currentLag == null) { + return Optional.empty(); + } + + var timeDiff = Duration.between(lastCollectionTime, clock.instant()).toSeconds(); + return Optional.of((currentLag - lastLag) / timeDiff); + } + + /** Query the available metric names for each job vertex for the current spec generation. */ + @SneakyThrows + protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames( + FlinkService flinkService, + AbstractFlinkResource<?, ?> cr, + Configuration conf, + JobTopology topology) { + + var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId()); + var vertices = getVertexList(flinkService, cr, conf); + + long deployedGeneration = getDeployedGeneration(cr); + + var previousMetricNames = availableVertexMetricNames.get(ResourceID.fromResource(cr)); + + if (previousMetricNames != null) { + if (deployedGeneration == previousMetricNames.f0) { + // We have already gathered the metric names for this spec, no need to query again + return previousMetricNames.f1; + } else { + availableVertexMetricNames.remove(ResourceID.fromResource(cr)); + } + } + + try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) { + var names = + vertices.stream() + .collect( + Collectors.toMap( + v -> v, + v -> + getFilteredVertexMetricNames( + restClient, jobId, v, topology, conf))); + availableVertexMetricNames.put( + ResourceID.fromResource(cr), Tuple2.of(deployedGeneration, names)); + return names; + } + } + + public static long getDeployedGeneration(AbstractFlinkResource<?, ?> cr) { + return cr.getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpecWithMeta() + .getMeta() + .getMetadata() + .getGeneration(); + } + + /** + * Query and filter metric names for a given job vertex. + * + * @param restClient Flink rest client. + * @param jobID Job Id. + * @param jobVertexID Job Vertex Id. + * @return Map of filtered metric names. + */ + @SneakyThrows + protected Map<String, FlinkMetric> getFilteredVertexMetricNames( + RestClusterClient<?> restClient, + JobID jobID, + JobVertexID jobVertexID, + JobTopology topology, + Configuration conf) { + + var allMetricNames = queryAggregatedMetricNames(restClient, jobID, jobVertexID); + + var filteredMetrics = new HashMap<String, FlinkMetric>(); + var requiredMetrics = new HashSet<FlinkMetric>(); + + requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC); + + if (topology.isSource(jobVertexID)) { + requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); + if (conf.getBoolean(SOURCE_SCALING_ENABLED)) { + requiredMetrics.add(FlinkMetric.PENDING_RECORDS); + } else { + FlinkMetric.PENDING_RECORDS + .findAny(allMetricNames) + .ifPresent(m -> filteredMetrics.put(m, FlinkMetric.PENDING_RECORDS)); + FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC + .findAny(allMetricNames) + .ifPresent( + m -> + filteredMetrics.put( + m, + FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC)); + } + } else { + // Not a source so we must have numRecordsInPerSecond + requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC); + } + + if (!topology.getOutputs().get(jobVertexID).isEmpty()) { + // Not a sink so we must have numRecordsOutPerSecond + requiredMetrics.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC); + } + + requiredMetrics.forEach( + flinkMetric -> { + filteredMetrics.put( + flinkMetric + .findAny(allMetricNames) + .orElseThrow( + () -> + new RuntimeException( + "Could not find required metric " + + flinkMetric.name() + + " for " + + jobVertexID)), + flinkMetric); + }); + + return filteredMetrics; + } + + @VisibleForTesting + protected Collection<AggregatedMetric> queryAggregatedMetricNames( + RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) + throws Exception { + var parameters = new AggregatedSubtaskMetricsParameters(); + var pathIt = parameters.getPathParameters().iterator(); + + ((JobIDPathParameter) pathIt.next()).resolve(jobID); + ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID); + + return restClient + .sendRequest( + AggregatedSubtaskMetricsHeaders.getInstance(), + parameters, + EmptyRequestBody.getInstance()) + .get() + .getMetrics(); + } + + protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> + queryAllAggregatedMetrics( + AbstractFlinkResource<?, ?> cr, + FlinkService flinkService, + Configuration conf, + Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames); + + @Override + public void cleanup(AbstractFlinkResource<?, ?> cr) { + LOG.info("Scaling metric cleanup"); + var resourceId = ResourceID.fromResource(cr); + histories.remove(resourceId); + availableVertexMetricNames.remove(resourceId); + topologies.remove(resourceId); + } + + @VisibleForTesting + protected void setClock(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } + + @VisibleForTesting + protected Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, FlinkMetric>>>> + getAvailableVertexMetricNames() { + return availableVertexMetricNames; + } + + @VisibleForTesting + protected Map<ResourceID, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>> + getHistories() { + return histories; + } + + @VisibleForTesting + protected Map<ResourceID, JobTopology> getTopologies() { + return topologies; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java new file mode 100644 index 00000000..76ba7a98 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.metrics; + +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import java.util.Collection; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * Enum representing the collected Flink metrics for autoscaling. The actual metric names depend on + * the JobGraph. + */ +public enum FlinkMetric { + BUSY_TIME_PER_SEC(s -> s.equals("busyTimeMsPerSecond")), + NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")), + NUM_RECORDS_OUT_PER_SEC(s -> s.equals("numRecordsOutPerSecond")), + SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC( + s -> s.startsWith("Source__") && s.endsWith(".numRecordsOutPerSecond")), + SOURCE_TASK_NUM_RECORDS_IN_PER_SEC( + s -> s.startsWith("Source__") && s.endsWith(".numRecordsInPerSecond")), + PENDING_RECORDS(s -> s.endsWith(".pendingRecords")); + + FlinkMetric(Predicate<String> predicate) { + this.predicate = predicate; + } + + public final Predicate<String> predicate; + + public Optional<String> findAny(Collection<AggregatedMetric> metrics) { + return metrics.stream().map(AggregatedMetric::getId).filter(predicate).findAny(); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java new file mode 100644 index 00000000..accecbe3 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** IO metrics information. */ +public final class IOMetricsInfo { + + public static final String FIELD_NAME_BYTES_READ = "read-bytes"; + + private static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete"; + + public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes"; + + private static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete"; + + public static final String FIELD_NAME_RECORDS_READ = "read-records"; + + private static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete"; + + public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records"; + + private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete"; + + public static final String FIELD_NAME_ACC_BACK_PRESSURE = "accumulated-backpressured-time"; + + public static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time"; + + public static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time"; + + @JsonProperty(FIELD_NAME_BYTES_READ) + private final long bytesRead; + + @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) + private final boolean bytesReadComplete; + + @JsonProperty(FIELD_NAME_BYTES_WRITTEN) + private final long bytesWritten; + + @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) + private final boolean bytesWrittenComplete; + + @JsonProperty(FIELD_NAME_RECORDS_READ) + private final long recordsRead; + + @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) + private final boolean recordsReadComplete; + + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) + private final long recordsWritten; + + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) + private final boolean recordsWrittenComplete; + + @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) + private final Long accumulatedBackpressured; + + @JsonProperty(FIELD_NAME_ACC_IDLE) + private final Long accumulatedIdle; + + @JsonProperty(FIELD_NAME_ACC_BUSY) + private final Double accumulatedBusy; + + @JsonCreator + public IOMetricsInfo( + @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead, + @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete, + @JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten, + @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete, + @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead, + @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete, + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten, + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete, + @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) Long accumulatedBackpressured, + @JsonProperty(FIELD_NAME_ACC_IDLE) Long accumulatedIdle, + @JsonProperty(FIELD_NAME_ACC_BUSY) Double accumulatedBusy) { + this.bytesRead = bytesRead; + this.bytesReadComplete = bytesReadComplete; + this.bytesWritten = bytesWritten; + this.bytesWrittenComplete = bytesWrittenComplete; + this.recordsRead = recordsRead; + this.recordsReadComplete = recordsReadComplete; + this.recordsWritten = recordsWritten; + this.recordsWrittenComplete = recordsWrittenComplete; + this.accumulatedBackpressured = accumulatedBackpressured; + this.accumulatedIdle = accumulatedIdle; + this.accumulatedBusy = accumulatedBusy; + } + + public long getBytesRead() { + return bytesRead; + } + + public boolean isBytesReadComplete() { + return bytesReadComplete; + } + + public long getBytesWritten() { + return bytesWritten; + } + + public boolean isBytesWrittenComplete() { + return bytesWrittenComplete; + } + + public long getRecordsRead() { + return recordsRead; + } + + public boolean isRecordsReadComplete() { + return recordsReadComplete; + } + + public long getRecordsWritten() { + return recordsWritten; + } + + public boolean isRecordsWrittenComplete() { + return recordsWrittenComplete; + } + + public long getAccumulatedBackpressured() { + return accumulatedBackpressured; + } + + public double getAccumulatedBusy() { + return accumulatedBusy; + } + + public long getAccumulatedIdle() { + return accumulatedIdle; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IOMetricsInfo that = (IOMetricsInfo) o; + return bytesRead == that.bytesRead + && bytesReadComplete == that.bytesReadComplete + && bytesWritten == that.bytesWritten + && bytesWrittenComplete == that.bytesWrittenComplete + && recordsRead == that.recordsRead + && recordsReadComplete == that.recordsReadComplete + && recordsWritten == that.recordsWritten + && recordsWrittenComplete == that.recordsWrittenComplete + && accumulatedBackpressured == that.accumulatedBackpressured + && accumulatedBusy == that.accumulatedBusy + && accumulatedIdle == that.accumulatedIdle; + } + + @Override + public int hashCode() { + return Objects.hash( + bytesRead, + bytesReadComplete, + bytesWritten, + bytesWrittenComplete, + recordsRead, + recordsReadComplete, + recordsWritten, + recordsWrittenComplete, + accumulatedBackpressured, + accumulatedBusy, + accumulatedIdle); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java new file mode 100644 index 00000000..11fc16a9 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** Test for scaling metrics collection logic. */ +@EnableKubernetesMockClient(crud = true) +public class MetricsCollectionAndEvaluationTest { + + private ScalingMetricEvaluator evaluator; + private TestingFlinkService service; + private TestingMetricsCollector metricsCollector; + private ScalingExecutor scalingExecutor; + + private FlinkDeployment app; + private Configuration conf; + private JobVertexID source1, source2, map, sink; + private JobTopology topology; + + private KubernetesClient kubernetesClient; + + @BeforeEach + public void setup() { + evaluator = new ScalingMetricEvaluator(); + scalingExecutor = new ScalingExecutor(kubernetesClient); + service = new TestingFlinkService(); + + app = TestUtils.buildApplicationCluster(); + app.getMetadata().setGeneration(1L); + app.getStatus().getJobStatus().setJobId(new JobID().toHexString()); + kubernetesClient.resource(app).createOrReplace(); + + source1 = new JobVertexID(); + source2 = new JobVertexID(); + map = new JobVertexID(); + sink = new JobVertexID(); + + topology = + new JobTopology( + new VertexInfo(source1, Set.of(), 2, 720), + new VertexInfo(source2, Set.of(), 2, 720), + new VertexInfo(map, Set.of(source1, source2), 12, 720), + new VertexInfo(sink, Set.of(map), 8, 24)); + + metricsCollector = new TestingMetricsCollector(topology); + + var confManager = new FlinkConfigManager(new Configuration()); + conf = confManager.getDeployConfig(app.getMetadata(), app.getSpec()); + conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); + conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); + conf.set(AutoScalerOptions.SCALING_ENABLED, true); + conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); + ReconciliationUtils.updateStatusForDeployedSpec(app, conf); + app.getStatus().getJobStatus().setStartTime(String.valueOf(System.currentTimeMillis())); + } + + @Test + public void testEndToEnd() throws Exception { + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + + var scalingInfo = new AutoScalerInfo(new HashMap<>()); + + setDefaultMetrics(); + metricsCollector.getMetricsHistory(app, scalingInfo, service, conf); + + // Test resetting the collector and make sure we can deserialize the scalingInfo correctly + metricsCollector = new TestingMetricsCollector(topology); + setDefaultMetrics(); + + var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault()); + metricsCollector.setClock(clock); + evaluator.setClock(clock); + + var collectedMetrics = metricsCollector.getMetricsHistory(app, scalingInfo, service, conf); + + var evaluation = evaluator.evaluate(conf, collectedMetrics); + scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation); + + var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(2, scaledParallelism.get(source1)); + assertEquals(2, scaledParallelism.get(source2)); + assertEquals(6, scaledParallelism.get(map)); + assertEquals(4, scaledParallelism.get(sink)); + + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + + evaluation = evaluator.evaluate(conf, collectedMetrics); + scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation); + + scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); + assertEquals(4, scaledParallelism.get(source1)); + assertEquals(4, scaledParallelism.get(source2)); + assertEquals(12, scaledParallelism.get(map)); + assertEquals(8, scaledParallelism.get(sink)); + + var resourceID = ResourceID.fromResource(app); + assertNotNull(metricsCollector.getHistories().get(resourceID)); + assertNotNull(metricsCollector.getTopologies().get(resourceID)); + + metricsCollector.cleanup(app); + scalingExecutor.cleanup(app); + assertNull(metricsCollector.getHistories().get(resourceID)); + assertNull(metricsCollector.getAvailableVertexMetricNames().get(resourceID)); + assertNull(metricsCollector.getTopologies().get(resourceID)); + assertNull(metricsCollector.getTopologies().get(resourceID)); + } + + private void setDefaultMetrics() { + metricsCollector.setCurrentMetrics( + Map.of( + source1, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)), + source2, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.), + FlinkMetric.PENDING_RECORDS, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)), + map, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN), + FlinkMetric.NUM_RECORDS_OUT_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 1000.)), + sink, + Map.of( + FlinkMetric.BUSY_TIME_PER_SEC, + new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN), + FlinkMetric.NUM_RECORDS_IN_PER_SEC, + new AggregatedMetric( + "", Double.NaN, Double.NaN, Double.NaN, 2000.)))); + } + + @Test + public void testKafkaPartitionMaxParallelism() throws Exception { + var scalingInfo = new AutoScalerInfo(new HashMap<>()); + + setDefaultMetrics(); + metricsCollector.getMetricsHistory(app, scalingInfo, service, conf); + + var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault()); + metricsCollector.setClock(clock); + evaluator.setClock(clock); + + var collectedMetrics = metricsCollector.getMetricsHistory(app, scalingInfo, service, conf); + + assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1)); + assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source2)); + + clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault()); + metricsCollector.setClock(clock); + evaluator.setClock(clock); + + metricsCollector.setMetricNames( + Map.of( + source1, + List.of( + new AggregatedMetric( + "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.currentOffset"), + new AggregatedMetric( + "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.1.currentOffset"), + new AggregatedMetric( + "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.2.currentOffset"), + new AggregatedMetric( + "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset")))); + + collectedMetrics = metricsCollector.getMetricsHistory(app, scalingInfo, service, conf); + assertEquals(4, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1)); + assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source2)); + } + + @Test + public void testJobDetailsRestCompatibility() throws JsonProcessingException { + String flink15Response = + "{\"jid\":\"068d4a00e4592099e94bb7a45f5bbd95\",\"name\":\"State machine job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487397898,\"end-time\":-1,\"duration\":82350,\"maxParallelism\":-1,\"now\":1667487480248,\"timestamps\":{\"RUNNING\":1667487398514,\"FAILING\":0,\"CANCELLING\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"SUSPENDED\":0,\"INITIALIZING\":1667487397898,\"CANCELED\":0,\"RECONCILING\":0,\"CREATED\":1667487398210},\"vertices\":[{\"id\" [...] + String flink16Response = + "{\"jid\":\"2667c218edfecda90ba9b4b23e8e14e1\",\"name\":\"State machine job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487688693,\"end-time\":-1,\"duration\":36646,\"maxParallelism\":-1,\"now\":1667487725339,\"timestamps\":{\"RESTARTING\":0,\"RECONCILING\":0,\"INITIALIZING\":1667487688693,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0,\"RUNNING\":1667487689116,\"FAILING\":0,\"FINISHED\":0,\"CREATED\":1667487688912,\"CANCELLING\":0},\"vertices\":[{\"id\" [...] + + var flinkObjectMapper = new ObjectMapper(); + flinkObjectMapper.readValue(flink15Response, JobDetailsInfo.class); + flinkObjectMapper.readValue(flink16Response, JobDetailsInfo.class); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java new file mode 100644 index 00000000..c487e116 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import lombok.Setter; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Testing {@link ScalingMetricCollector} implementation. */ +public class TestingMetricsCollector extends ScalingMetricCollector { + + @Setter private JobTopology jobTopology; + + @Setter + private Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> currentMetrics = new HashMap<>(); + + @Setter private Map<JobVertexID, Collection<AggregatedMetric>> metricNames = new HashMap<>(); + + public TestingMetricsCollector(JobTopology jobTopology) { + this.jobTopology = jobTopology; + } + + @Override + protected JobTopology queryJobTopology(RestClusterClient<String> restClient, JobID jobId) { + return jobTopology; + } + + @Override + protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics( + AbstractFlinkResource<?, ?> cr, + FlinkService flinkService, + Configuration conf, + Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames) { + return currentMetrics; + } + + @Override + protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames( + FlinkService flinkService, + AbstractFlinkResource<?, ?> cr, + Configuration conf, + JobTopology topology) { + return Collections.emptyMap(); + } + + @Override + protected Collection<AggregatedMetric> queryAggregatedMetricNames( + RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) { + return metricNames.getOrDefault(jobVertexID, Collections.emptyList()); + } +}
