This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 398a87c9 [FLINK-34329][autoscaler] Fix the bug that scaling report
parser doesn't support Locale
398a87c9 is described below
commit 398a87c9012ddc79bfe4b2378cea740642283628
Author: Rui Fan <[email protected]>
AuthorDate: Thu Feb 1 17:29:38 2024 +0800
[FLINK-34329][autoscaler] Fix the bug that scaling report parser doesn't
support Locale
---
.../autoscaler/event/AutoscalerEventUtils.java | 22 ++++++++--
.../event/AutoScalerEventHandlerTest.java | 51 +++++++++++++++++-----
2 files changed, 58 insertions(+), 15 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
index 1628ebfe..a9a5fda2 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
@@ -19,6 +19,10 @@ package org.apache.flink.autoscaler.event;
import org.apache.flink.annotation.Experimental;
+import lombok.SneakyThrows;
+
+import java.text.NumberFormat;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
@@ -60,10 +64,22 @@ public class AutoscalerEventUtils {
vertexScalingReport.setVertexId(m.group(1));
vertexScalingReport.setCurrentParallelism(Integer.parseInt(m.group(2)));
vertexScalingReport.setNewParallelism(Integer.parseInt(m.group(3)));
-
vertexScalingReport.setCurrentProcessCapacity(Double.parseDouble(m.group(4)));
-
vertexScalingReport.setExpectedProcessCapacity(Double.parseDouble(m.group(5)));
-
vertexScalingReport.setTargetDataRate(Double.parseDouble(m.group(6)));
+
vertexScalingReport.setCurrentProcessCapacity(convertStringToDouble(m.group(4)));
+
vertexScalingReport.setExpectedProcessCapacity(convertStringToDouble(m.group(5)));
+
vertexScalingReport.setTargetDataRate(convertStringToDouble(m.group(6)));
}
return vertexScalingReport;
}
+
+ @SneakyThrows
+ private static double convertStringToDouble(String str) {
+ try {
+ // Using the NumberFormat to support Locale format because
+ // the event is formatted in String.format, it uses the Locale by
default.
+ return NumberFormat.getInstance().parse(str).doubleValue();
+ } catch (ParseException e) {
+ // NumberFormat doesn't support Infinity and NaN.
+ return Double.parseDouble(str);
+ }
+ }
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
index 8a52925c..9c8a720c 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
@@ -21,10 +21,12 @@ import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
@@ -35,16 +37,41 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link AutoScalerEventHandler}. */
class AutoScalerEventHandlerTest {
- @Test
- void testScalingReport() {
+ private static List<Locale> localesProvider() {
+ return List.of(
+ Locale.ENGLISH,
+ Locale.UK,
+ Locale.US,
+ Locale.GERMAN,
+ Locale.CHINA,
+ Locale.CANADA,
+ Locale.ITALIAN,
+ Locale.JAPANESE,
+ Locale.KOREA);
+ }
+
+ @ParameterizedTest
+ @MethodSource("localesProvider")
+ void testScalingReport(Locale locale) {
+ Locale.setDefault(locale);
var expectedJson =
- "Scaling execution enabled, begin scaling vertices:"
- + "{ Vertex ID ea632d67b7d595e5b851708ae9ad79d6 |
Parallelism 3 -> 1 | "
- + "Processing capacity 424.68 -> 123.40 | Target data
rate 403.67}"
- + "{ Vertex ID bc764cd8ddf7a0cff126f51c16239658 |
Parallelism 4 -> 2 | "
- + "Processing capacity Infinity -> Infinity | Target
data rate 812.58}"
- + "{ Vertex ID 0a448493b4782967b150582570326227 |
Parallelism 5 -> 8 | "
- + "Processing capacity 404.73 -> 645.00 | Target data
rate 404.27}";
+ String.format(
+ "Scaling execution enabled, begin scaling vertices:"
+ + "{ Vertex ID
ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 3 -> 1 | "
+ + "Processing capacity %.2f -> %.2f | Target
data rate %.2f}"
+ + "{ Vertex ID
bc764cd8ddf7a0cff126f51c16239658 | Parallelism 4 -> 2 | "
+ + "Processing capacity %.2f -> %.2f | Target
data rate %.2f}"
+ + "{ Vertex ID
0a448493b4782967b150582570326227 | Parallelism 5 -> 8 | "
+ + "Processing capacity %.2f -> %.2f | Target
data rate %.2f}",
+ 12424.68,
+ 123.40,
+ 403.67,
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY,
+ 812.58,
+ 404.73,
+ 645.00,
+ 404.27);
final var scalingSummaries = buildScalingSummaries();
assertThat(
@@ -68,7 +95,7 @@ class AutoScalerEventHandlerTest {
jobVertex2,
generateScalingSummary(
4, 2, Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, 812.583));
- scalingSummaries.put(jobVertex3, generateScalingSummary(3, 1, 424.678,
123.4, 403.673));
+ scalingSummaries.put(jobVertex3, generateScalingSummary(3, 1,
12424.678, 123.4, 403.673));
scalingSummaries.put(jobVertex1, generateScalingSummary(5, 8, 404.727,
645.0, 404.268));
return scalingSummaries;
@@ -86,7 +113,7 @@ class AutoScalerEventHandlerTest {
Double.POSITIVE_INFINITY,
812.58),
new VertexScalingReport(
- "ea632d67b7d595e5b851708ae9ad79d6", 3, 1, 424.68,
123.4, 403.67));
+ "ea632d67b7d595e5b851708ae9ad79d6", 3, 1, 12424.68,
123.4, 403.67));
}
private ScalingSummary generateScalingSummary(