This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 5aa9956e7c61a5d58fb70a5ef8dd0ccb2dcabf53
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Dec 1 08:55:50 2022 +0100

    [FLINK-30260][autoscaler] Utilities for collecting and computing metrics
---
 .../operator/autoscaler/topology/JobTopology.java  | 154 +++++++++++++++++++++
 .../operator/autoscaler/topology/VertexInfo.java   |  37 +++++
 .../operator/autoscaler/utils/AutoScalerUtils.java |  70 ++++++++++
 .../autoscaler/utils/JobVertexSerDeModule.java     |  54 ++++++++
 .../operator/autoscaler/JobTopologyTest.java       |  99 +++++++++++++
 5 files changed, 414 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
new file mode 100644
index 00000000..3f2462da
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.topology;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Structure representing information about the jobgraph that is relevant for 
scaling. */
+@ToString
+@EqualsAndHashCode
+public class JobTopology {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Getter private final ImmutableMap<JobVertexID, Set<JobVertexID>> inputs;
+    @Getter private final ImmutableMap<JobVertexID, Set<JobVertexID>> outputs;
+    @Getter private final ImmutableMap<JobVertexID, Integer> parallelisms;
+    private final ImmutableMap<JobVertexID, Integer> originalMaxParallelism;
+    @Getter private final Map<JobVertexID, Integer> maxParallelisms;
+
+    public JobTopology(VertexInfo... vertexInfo) {
+        this(Set.of(vertexInfo));
+    }
+
+    public JobTopology(Set<VertexInfo> vertexInfo) {
+
+        Map<JobVertexID, Set<JobVertexID>> vertexOutputs = new HashMap<>();
+        Map<JobVertexID, Set<JobVertexID>> vertexInputs = new HashMap<>();
+        Map<JobVertexID, Integer> vertexParallelism = new HashMap<>();
+        maxParallelisms = new HashMap<>();
+
+        vertexInfo.forEach(
+                info -> {
+                    var vertexId = info.getId();
+                    vertexParallelism.put(vertexId, info.getParallelism());
+                    maxParallelisms.put(vertexId, info.getMaxParallelism());
+
+                    vertexInputs.put(vertexId, info.getInputs());
+                    vertexOutputs.computeIfAbsent(vertexId, id -> new 
HashSet<>());
+                    info.getInputs()
+                            .forEach(
+                                    inputId ->
+                                            vertexOutputs
+                                                    .computeIfAbsent(inputId, 
id -> new HashSet<>())
+                                                    .add(vertexId));
+                });
+
+        var outputBuilder = ImmutableMap.<JobVertexID, 
Set<JobVertexID>>builder();
+        vertexOutputs.forEach((id, l) -> outputBuilder.put(id, 
ImmutableSet.copyOf(l)));
+        outputs = outputBuilder.build();
+
+        var inputBuilder = ImmutableMap.<JobVertexID, 
Set<JobVertexID>>builder();
+        vertexInputs.forEach((id, l) -> inputBuilder.put(id, 
ImmutableSet.copyOf(l)));
+        this.inputs = inputBuilder.build();
+
+        this.parallelisms = ImmutableMap.copyOf(vertexParallelism);
+        this.originalMaxParallelism = ImmutableMap.copyOf(maxParallelisms);
+    }
+
+    public boolean isSource(JobVertexID jobVertexID) {
+        return getInputs().get(jobVertexID).isEmpty();
+    }
+
+    public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) 
{
+        maxParallelisms.put(
+                vertexID, Math.min(originalMaxParallelism.get(vertexID), 
maxParallelism));
+    }
+
+    public List<JobVertexID> getVerticesInTopologicalOrder() {
+        List<JobVertexID> sorted = new ArrayList<>(inputs.size());
+
+        Map<JobVertexID, List<JobVertexID>> remainingInputs = new 
HashMap<>(inputs.size());
+        inputs.forEach((v, l) -> remainingInputs.put(v, new ArrayList<>(l)));
+
+        while (!remainingInputs.isEmpty()) {
+            List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>();
+            remainingInputs.forEach(
+                    (v, inputs) -> {
+                        if (inputs.isEmpty()) {
+                            verticesWithZeroIndegree.add(v);
+                        }
+                    });
+
+            verticesWithZeroIndegree.forEach(
+                    v -> {
+                        remainingInputs.remove(v);
+                        outputs.get(v).forEach(o -> 
remainingInputs.get(o).remove(v));
+                    });
+
+            sorted.addAll(verticesWithZeroIndegree);
+        }
+        return sorted;
+    }
+
+    public static JobTopology fromJsonPlan(
+            String jsonPlan, Map<JobVertexID, Integer> maxParallelismMap)
+            throws JsonProcessingException {
+        ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
+        ArrayNode nodes = (ArrayNode) plan.get("nodes");
+
+        var vertexInfo = new HashSet<VertexInfo>();
+
+        for (JsonNode node : nodes) {
+            var vertexId = JobVertexID.fromHexString(node.get("id").asText());
+            var inputList = new HashSet<JobVertexID>();
+            vertexInfo.add(
+                    new VertexInfo(
+                            vertexId,
+                            inputList,
+                            node.get("parallelism").asInt(),
+                            maxParallelismMap.get(vertexId)));
+            if (node.has("inputs")) {
+                for (JsonNode input : node.get("inputs")) {
+                    
inputList.add(JobVertexID.fromHexString(input.get("id").asText()));
+                }
+            }
+        }
+
+        return new JobTopology(vertexInfo);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
new file mode 100644
index 00000000..1137e9e6
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.topology;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Set;
+
+/** Job vertex information. */
+@Value
+public class VertexInfo {
+
+    JobVertexID id;
+
+    Set<JobVertexID> inputs;
+
+    int parallelism;
+
+    int maxParallelism;
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
new file mode 100644
index 00000000..a1353f65
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.utils;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+
+import java.util.Map;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+
+/** AutoScaler utilities. */
+public class AutoScalerUtils {
+
+    public static double getTargetProcessingCapacity(
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            Configuration conf,
+            double targetUtilization,
+            boolean withRestart) {
+
+        // Target = Lag Catchup Rate + Restart Catchup Rate + Processing at 
utilization
+        // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + 
INPUT_RATE/TARGET_UTIL
+
+        double lagCatchupTargetRate = 
evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent();
+        if (Double.isNaN(lagCatchupTargetRate)) {
+            return Double.NaN;
+        }
+
+        double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
+        double restartTimeSec = 
conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
+
+        targetUtilization = Math.max(0., targetUtilization);
+        targetUtilization = Math.min(1., targetUtilization);
+
+        double avgInputTargetRate = 
evaluatedMetrics.get(TARGET_DATA_RATE).getAverage();
+        if (Double.isNaN(avgInputTargetRate)) {
+            return Double.NaN;
+        }
+
+        if (targetUtilization == 0) {
+            return Double.POSITIVE_INFINITY;
+        }
+
+        double restartCatchupRate =
+                !withRestart || catchUpTargetSec == 0
+                        ? 0
+                        : (avgInputTargetRate * restartTimeSec) / 
catchUpTargetSec;
+        double inputTargetAtUtilization = avgInputTargetRate / 
targetUtilization;
+
+        return Math.round(lagCatchupTargetRate + restartCatchupRate + 
inputTargetAtUtilization);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
new file mode 100644
index 00000000..5a5195a0
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.utils;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import java.io.IOException;
+
+/** Jackson serializer module for {@link JobVertexID}. */
+public class JobVertexSerDeModule extends SimpleModule {
+
+    public JobVertexSerDeModule() {
+        this.addKeySerializer(JobVertexID.class, new 
JobVertexIdKeySerializer());
+        this.addKeyDeserializer(JobVertexID.class, new 
JobVertexIdKeyDeserializer());
+    }
+
+    private static class JobVertexIdKeySerializer extends 
JsonSerializer<JobVertexID> {
+        @Override
+        public void serialize(JobVertexID value, JsonGenerator jgen, 
SerializerProvider provider)
+                throws IOException {
+
+            jgen.writeFieldName(value.toHexString());
+        }
+    }
+
+    private static class JobVertexIdKeyDeserializer extends KeyDeserializer {
+        @Override
+        public Object deserializeKey(String s, DeserializationContext 
deserializationContext) {
+            return JobVertexID.fromHexString(s);
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
new file mode 100644
index 00000000..406c2e30
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for JobTopology parsing logic. */
+public class JobTopologyTest {
+
+    @Test
+    public void testTopologyFromJson() throws JsonProcessingException {
+        var env = StreamExecutionEnvironment.getExecutionEnvironment();
+        var s1 = env.fromElements(1).name("s1");
+        var s2 = env.fromElements(1).name("s2");
+
+        s1.union(s2)
+                .shuffle()
+                .map(i -> i)
+                .name("map1")
+                .setParallelism(2)
+                .shuffle()
+                .print()
+                .name("sink1")
+                .setParallelism(3);
+
+        var s3 = env.fromElements(1).name("s3");
+        var map2 = s3.shuffle().map(i -> 
i).name("map2").setParallelism(4).shuffle();
+
+        map2.print().name("sink2").setParallelism(5);
+        map2.print().name("sink3").setParallelism(6);
+
+        var jobGraph = env.getStreamGraph().getJobGraph();
+        var jsonPlan = JsonPlanGenerator.generatePlan(jobGraph);
+
+        var vertices = new HashMap<String, JobVertexID>();
+        var maxParallelism = new HashMap<JobVertexID, Integer>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertices.put(vertex.getName(), vertex.getID());
+            maxParallelism.put(
+                    vertex.getID(),
+                    vertex.getMaxParallelism() != -1
+                            ? vertex.getMaxParallelism()
+                            : SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+
+        JobTopology jobTopology = JobTopology.fromJsonPlan(jsonPlan, 
maxParallelism);
+
+        assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: 
sink1")).isEmpty());
+        assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: 
sink2")).isEmpty());
+        assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: 
sink3")).isEmpty());
+
+        assertEquals(
+                Set.of(vertices.get("map1")),
+                jobTopology.getOutputs().get(vertices.get("Source: s1")));
+        assertEquals(
+                Set.of(vertices.get("map1")),
+                jobTopology.getOutputs().get(vertices.get("Source: s2")));
+        assertEquals(
+                Set.of(vertices.get("map2")),
+                jobTopology.getOutputs().get(vertices.get("Source: s3")));
+
+        assertEquals(
+                Set.of(vertices.get("Sink: sink2"), vertices.get("Sink: 
sink3")),
+                jobTopology.getOutputs().get(vertices.get("map2")));
+
+        assertEquals(2, 
jobTopology.getParallelisms().get(vertices.get("map1")));
+        assertEquals(4, 
jobTopology.getParallelisms().get(vertices.get("map2")));
+        jobTopology.getMaxParallelisms().forEach((v, p) -> assertEquals(128, 
p));
+    }
+}

Reply via email to