This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 5aa9956e7c61a5d58fb70a5ef8dd0ccb2dcabf53 Author: Gyula Fora <[email protected]> AuthorDate: Thu Dec 1 08:55:50 2022 +0100 [FLINK-30260][autoscaler] Utilities for collecting and computing metrics --- .../operator/autoscaler/topology/JobTopology.java | 154 +++++++++++++++++++++ .../operator/autoscaler/topology/VertexInfo.java | 37 +++++ .../operator/autoscaler/utils/AutoScalerUtils.java | 70 ++++++++++ .../autoscaler/utils/JobVertexSerDeModule.java | 54 ++++++++ .../operator/autoscaler/JobTopologyTest.java | 99 +++++++++++++ 5 files changed, 414 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java new file mode 100644 index 00000000..3f2462da --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.topology; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** Structure representing information about the jobgraph that is relevant for scaling. */ +@ToString +@EqualsAndHashCode +public class JobTopology { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + @Getter private final ImmutableMap<JobVertexID, Set<JobVertexID>> inputs; + @Getter private final ImmutableMap<JobVertexID, Set<JobVertexID>> outputs; + @Getter private final ImmutableMap<JobVertexID, Integer> parallelisms; + private final ImmutableMap<JobVertexID, Integer> originalMaxParallelism; + @Getter private final Map<JobVertexID, Integer> maxParallelisms; + + public JobTopology(VertexInfo... vertexInfo) { + this(Set.of(vertexInfo)); + } + + public JobTopology(Set<VertexInfo> vertexInfo) { + + Map<JobVertexID, Set<JobVertexID>> vertexOutputs = new HashMap<>(); + Map<JobVertexID, Set<JobVertexID>> vertexInputs = new HashMap<>(); + Map<JobVertexID, Integer> vertexParallelism = new HashMap<>(); + maxParallelisms = new HashMap<>(); + + vertexInfo.forEach( + info -> { + var vertexId = info.getId(); + vertexParallelism.put(vertexId, info.getParallelism()); + maxParallelisms.put(vertexId, info.getMaxParallelism()); + + vertexInputs.put(vertexId, info.getInputs()); + vertexOutputs.computeIfAbsent(vertexId, id -> new HashSet<>()); + info.getInputs() + .forEach( + inputId -> + vertexOutputs + .computeIfAbsent(inputId, id -> new HashSet<>()) + .add(vertexId)); + }); + + var outputBuilder = ImmutableMap.<JobVertexID, Set<JobVertexID>>builder(); + vertexOutputs.forEach((id, l) -> outputBuilder.put(id, ImmutableSet.copyOf(l))); + outputs = outputBuilder.build(); + + var inputBuilder = ImmutableMap.<JobVertexID, Set<JobVertexID>>builder(); + vertexInputs.forEach((id, l) -> inputBuilder.put(id, ImmutableSet.copyOf(l))); + this.inputs = inputBuilder.build(); + + this.parallelisms = ImmutableMap.copyOf(vertexParallelism); + this.originalMaxParallelism = ImmutableMap.copyOf(maxParallelisms); + } + + public boolean isSource(JobVertexID jobVertexID) { + return getInputs().get(jobVertexID).isEmpty(); + } + + public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) { + maxParallelisms.put( + vertexID, Math.min(originalMaxParallelism.get(vertexID), maxParallelism)); + } + + public List<JobVertexID> getVerticesInTopologicalOrder() { + List<JobVertexID> sorted = new ArrayList<>(inputs.size()); + + Map<JobVertexID, List<JobVertexID>> remainingInputs = new HashMap<>(inputs.size()); + inputs.forEach((v, l) -> remainingInputs.put(v, new ArrayList<>(l))); + + while (!remainingInputs.isEmpty()) { + List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>(); + remainingInputs.forEach( + (v, inputs) -> { + if (inputs.isEmpty()) { + verticesWithZeroIndegree.add(v); + } + }); + + verticesWithZeroIndegree.forEach( + v -> { + remainingInputs.remove(v); + outputs.get(v).forEach(o -> remainingInputs.get(o).remove(v)); + }); + + sorted.addAll(verticesWithZeroIndegree); + } + return sorted; + } + + public static JobTopology fromJsonPlan( + String jsonPlan, Map<JobVertexID, Integer> maxParallelismMap) + throws JsonProcessingException { + ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class); + ArrayNode nodes = (ArrayNode) plan.get("nodes"); + + var vertexInfo = new HashSet<VertexInfo>(); + + for (JsonNode node : nodes) { + var vertexId = JobVertexID.fromHexString(node.get("id").asText()); + var inputList = new HashSet<JobVertexID>(); + vertexInfo.add( + new VertexInfo( + vertexId, + inputList, + node.get("parallelism").asInt(), + maxParallelismMap.get(vertexId))); + if (node.has("inputs")) { + for (JsonNode input : node.get("inputs")) { + inputList.add(JobVertexID.fromHexString(input.get("id").asText())); + } + } + } + + return new JobTopology(vertexInfo); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java new file mode 100644 index 00000000..1137e9e6 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.topology; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import lombok.Value; + +import java.util.Set; + +/** Job vertex information. */ +@Value +public class VertexInfo { + + JobVertexID id; + + Set<JobVertexID> inputs; + + int parallelism; + + int maxParallelism; +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java new file mode 100644 index 00000000..a1353f65 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; + +import java.util.Map; + +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; + +/** AutoScaler utilities. */ +public class AutoScalerUtils { + + public static double getTargetProcessingCapacity( + Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, + Configuration conf, + double targetUtilization, + boolean withRestart) { + + // Target = Lag Catchup Rate + Restart Catchup Rate + Processing at utilization + // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + INPUT_RATE/TARGET_UTIL + + double lagCatchupTargetRate = evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent(); + if (Double.isNaN(lagCatchupTargetRate)) { + return Double.NaN; + } + + double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); + double restartTimeSec = conf.get(AutoScalerOptions.RESTART_TIME).toSeconds(); + + targetUtilization = Math.max(0., targetUtilization); + targetUtilization = Math.min(1., targetUtilization); + + double avgInputTargetRate = evaluatedMetrics.get(TARGET_DATA_RATE).getAverage(); + if (Double.isNaN(avgInputTargetRate)) { + return Double.NaN; + } + + if (targetUtilization == 0) { + return Double.POSITIVE_INFINITY; + } + + double restartCatchupRate = + !withRestart || catchUpTargetSec == 0 + ? 0 + : (avgInputTargetRate * restartTimeSec) / catchUpTargetSec; + double inputTargetAtUtilization = avgInputTargetRate / targetUtilization; + + return Math.round(lagCatchupTargetRate + restartCatchupRate + inputTargetAtUtilization); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java new file mode 100644 index 00000000..5a5195a0 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.utils; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import java.io.IOException; + +/** Jackson serializer module for {@link JobVertexID}. */ +public class JobVertexSerDeModule extends SimpleModule { + + public JobVertexSerDeModule() { + this.addKeySerializer(JobVertexID.class, new JobVertexIdKeySerializer()); + this.addKeyDeserializer(JobVertexID.class, new JobVertexIdKeyDeserializer()); + } + + private static class JobVertexIdKeySerializer extends JsonSerializer<JobVertexID> { + @Override + public void serialize(JobVertexID value, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + + jgen.writeFieldName(value.toHexString()); + } + } + + private static class JobVertexIdKeyDeserializer extends KeyDeserializer { + @Override + public Object deserializeKey(String s, DeserializationContext deserializationContext) { + return JobVertexID.fromHexString(s); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java new file mode 100644 index 00000000..406c2e30 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for JobTopology parsing logic. */ +public class JobTopologyTest { + + @Test + public void testTopologyFromJson() throws JsonProcessingException { + var env = StreamExecutionEnvironment.getExecutionEnvironment(); + var s1 = env.fromElements(1).name("s1"); + var s2 = env.fromElements(1).name("s2"); + + s1.union(s2) + .shuffle() + .map(i -> i) + .name("map1") + .setParallelism(2) + .shuffle() + .print() + .name("sink1") + .setParallelism(3); + + var s3 = env.fromElements(1).name("s3"); + var map2 = s3.shuffle().map(i -> i).name("map2").setParallelism(4).shuffle(); + + map2.print().name("sink2").setParallelism(5); + map2.print().name("sink3").setParallelism(6); + + var jobGraph = env.getStreamGraph().getJobGraph(); + var jsonPlan = JsonPlanGenerator.generatePlan(jobGraph); + + var vertices = new HashMap<String, JobVertexID>(); + var maxParallelism = new HashMap<JobVertexID, Integer>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertices.put(vertex.getName(), vertex.getID()); + maxParallelism.put( + vertex.getID(), + vertex.getMaxParallelism() != -1 + ? vertex.getMaxParallelism() + : SchedulerBase.getDefaultMaxParallelism(vertex)); + } + + JobTopology jobTopology = JobTopology.fromJsonPlan(jsonPlan, maxParallelism); + + assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: sink1")).isEmpty()); + assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: sink2")).isEmpty()); + assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: sink3")).isEmpty()); + + assertEquals( + Set.of(vertices.get("map1")), + jobTopology.getOutputs().get(vertices.get("Source: s1"))); + assertEquals( + Set.of(vertices.get("map1")), + jobTopology.getOutputs().get(vertices.get("Source: s2"))); + assertEquals( + Set.of(vertices.get("map2")), + jobTopology.getOutputs().get(vertices.get("Source: s3"))); + + assertEquals( + Set.of(vertices.get("Sink: sink2"), vertices.get("Sink: sink3")), + jobTopology.getOutputs().get(vertices.get("map2"))); + + assertEquals(2, jobTopology.getParallelisms().get(vertices.get("map1"))); + assertEquals(4, jobTopology.getParallelisms().get(vertices.get("map2"))); + jobTopology.getMaxParallelisms().forEach((v, p) -> assertEquals(128, p)); + } +}
