This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 66808eb9 [FLINK-34104][autoscaler] Improve the ScalingReport format of
autoscaling
66808eb9 is described below
commit 66808eb9f2d7d26dcd755fec7fdab5ccaf75f8bc
Author: Rui Fan <[email protected]>
AuthorDate: Wed Jan 17 12:01:49 2024 +0800
[FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling
---
.../autoscaler/event/AutoScalerEventHandler.java | 2 +-
.../autoscaler/event/AutoscalerEventUtils.java | 69 +++++++++++++
.../autoscaler/event/VertexScalingReport.java | 44 ++++++++
.../event/AutoScalerEventHandlerTest.java | 113 +++++++++++++++++++++
4 files changed, 227 insertions(+), 1 deletion(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index ed818a89..5695d109 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -40,7 +40,7 @@ import static
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends
JobAutoScalerContext<KEY>> {
String SCALING_SUMMARY_ENTRY =
- " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f
-> %.2f | Target data rate %.2f";
+ "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f
-> %.2f | Target data rate %.2f}";
String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism
change:";
String SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED =
"Scaling execution disabled by config ";
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
new file mode 100644
index 00000000..1628ebfe
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** The utils of {@link AutoScalerEventHandler}. */
+@Experimental
+public class AutoscalerEventUtils {
+
+ private static final Pattern SCALING_REPORT_SEPARATOR =
Pattern.compile("\\{(.+?)\\}");
+ private static final Pattern VERTEX_SCALING_REPORT_PATTERN =
+ Pattern.compile(
+ "Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\|
Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)");
+
+ /** Parse the scaling report from original scaling report event. */
+ public static List<VertexScalingReport> parseVertexScalingReports(String
scalingReport) {
+ final List<String> originalVertexScalingReports =
+ extractOriginalVertexScalingReports(scalingReport);
+ return originalVertexScalingReports.stream()
+ .map(AutoscalerEventUtils::extractVertexScalingReport)
+ .collect(Collectors.toList());
+ }
+
+ private static List<String> extractOriginalVertexScalingReports(String
scalingReport) {
+ var result = new ArrayList<String>();
+ var m = SCALING_REPORT_SEPARATOR.matcher(scalingReport);
+
+ while (m.find()) {
+ result.add(m.group(1));
+ }
+ return result;
+ }
+
+ private static VertexScalingReport extractVertexScalingReport(String
vertexScalingReportStr) {
+ final var vertexScalingReport = new VertexScalingReport();
+ var m = VERTEX_SCALING_REPORT_PATTERN.matcher(vertexScalingReportStr);
+
+ if (m.find()) {
+ vertexScalingReport.setVertexId(m.group(1));
+
vertexScalingReport.setCurrentParallelism(Integer.parseInt(m.group(2)));
+
vertexScalingReport.setNewParallelism(Integer.parseInt(m.group(3)));
+
vertexScalingReport.setCurrentProcessCapacity(Double.parseDouble(m.group(4)));
+
vertexScalingReport.setExpectedProcessCapacity(Double.parseDouble(m.group(5)));
+
vertexScalingReport.setTargetDataRate(Double.parseDouble(m.group(6)));
+ }
+ return vertexScalingReport;
+ }
+}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java
new file mode 100644
index 00000000..e397c643
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** The scaling report of single vertex. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class VertexScalingReport {
+
+ private String vertexId;
+
+ private int currentParallelism;
+
+ private int newParallelism;
+
+ private double currentProcessCapacity;
+
+ private double expectedProcessCapacity;
+
+ private double targetDataRate;
+}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
new file mode 100644
index 00000000..8a52925c
--- /dev/null
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AutoScalerEventHandler}. */
+class AutoScalerEventHandlerTest {
+
+ @Test
+ void testScalingReport() {
+ var expectedJson =
+ "Scaling execution enabled, begin scaling vertices:"
+ + "{ Vertex ID ea632d67b7d595e5b851708ae9ad79d6 |
Parallelism 3 -> 1 | "
+ + "Processing capacity 424.68 -> 123.40 | Target data
rate 403.67}"
+ + "{ Vertex ID bc764cd8ddf7a0cff126f51c16239658 |
Parallelism 4 -> 2 | "
+ + "Processing capacity Infinity -> Infinity | Target
data rate 812.58}"
+ + "{ Vertex ID 0a448493b4782967b150582570326227 |
Parallelism 5 -> 8 | "
+ + "Processing capacity 404.73 -> 645.00 | Target data
rate 404.27}";
+
+ final var scalingSummaries = buildScalingSummaries();
+ assertThat(
+ AutoScalerEventHandler.scalingReport(
+ scalingSummaries,
+ AutoScalerEventHandler
+
.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED))
+ .isEqualTo(expectedJson);
+
+
assertThat(AutoscalerEventUtils.parseVertexScalingReports(expectedJson))
+
.containsExactlyInAnyOrderElementsOf(buildExpectedScalingResults());
+ }
+
+ private HashMap<JobVertexID, ScalingSummary> buildScalingSummaries() {
+ final var jobVertex1 =
JobVertexID.fromHexString("0a448493b4782967b150582570326227");
+ final var jobVertex2 =
JobVertexID.fromHexString("bc764cd8ddf7a0cff126f51c16239658");
+ final var jobVertex3 =
JobVertexID.fromHexString("ea632d67b7d595e5b851708ae9ad79d6");
+
+ final var scalingSummaries = new HashMap<JobVertexID,
ScalingSummary>();
+ scalingSummaries.put(
+ jobVertex2,
+ generateScalingSummary(
+ 4, 2, Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, 812.583));
+ scalingSummaries.put(jobVertex3, generateScalingSummary(3, 1, 424.678,
123.4, 403.673));
+ scalingSummaries.put(jobVertex1, generateScalingSummary(5, 8, 404.727,
645.0, 404.268));
+
+ return scalingSummaries;
+ }
+
+ private List<VertexScalingReport> buildExpectedScalingResults() {
+ return List.of(
+ new VertexScalingReport(
+ "0a448493b4782967b150582570326227", 5, 8, 404.73,
645.0, 404.27),
+ new VertexScalingReport(
+ "bc764cd8ddf7a0cff126f51c16239658",
+ 4,
+ 2,
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY,
+ 812.58),
+ new VertexScalingReport(
+ "ea632d67b7d595e5b851708ae9ad79d6", 3, 1, 424.68,
123.4, 403.67));
+ }
+
+ private ScalingSummary generateScalingSummary(
+ int currentParallelism,
+ int newParallelism,
+ double currentProcessCapacity,
+ double expectedProcessCapacity,
+ double targetDataRate) {
+ final var scalingSummary = new ScalingSummary();
+ scalingSummary.setCurrentParallelism(currentParallelism);
+ scalingSummary.setNewParallelism(newParallelism);
+
+ final var metrics =
+ Map.of(
+ TRUE_PROCESSING_RATE,
+ new EvaluatedScalingMetric(400,
currentProcessCapacity),
+ EXPECTED_PROCESSING_RATE,
+ new EvaluatedScalingMetric(expectedProcessCapacity,
400),
+ TARGET_DATA_RATE,
+ new EvaluatedScalingMetric(400, targetDataRate));
+ scalingSummary.setMetrics(metrics);
+ return scalingSummary;
+ }
+}