This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit b26a0f5f7469c06764ad1bb9f838f51792f49ba8
Author: Rui Fan <[email protected]>
AuthorDate: Thu Aug 22 15:14:44 2024 +0800

    [FLINK-36129][autoscaler] Autoscaler is compatible with Flink 1.20
---
 .../messages/job/metrics/AggregatedMetric.java     | 147 +++++++++++++++++++++
 .../flink/autoscaler/RestApiMetricsCollector.java  |   3 +
 .../flink/autoscaler/metrics/FlinkMetric.java      |   2 +-
 .../flink/autoscaler/metrics/ScalingMetrics.java   |   3 +-
 .../messages/job/metrics/AggregatedMetric.java     | 147 +++++++++++++++++++++
 .../autoscaler/RestApiMetricsCollectorTest.java    |  33 +++--
 .../autoscaler/metrics/ScalingMetricsTest.java     |  31 +++--
 .../flink/autoscaler/metrics/TestMetrics.java      |   6 +-
 .../messages/job/metrics/AggregatedMetric.java     | 147 +++++++++++++++++++++
 9 files changed, 495 insertions(+), 24 deletions(-)

diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
new file mode 100644
index 00000000..ba3efcda
--- /dev/null
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * TODO : It can be removed after upgrading flink dependency to 1.20.
+ *
+ * <p>Response type for aggregated metrics. Contains the metric name and 
optionally the sum,
+ * average, minimum and maximum.
+ */
+public class AggregatedMetric {
+
+    private static final String FIELD_NAME_ID = "id";
+
+    private static final String FIELD_NAME_MIN = "min";
+
+    private static final String FIELD_NAME_MAX = "max";
+
+    private static final String FIELD_NAME_AVG = "avg";
+
+    private static final String FIELD_NAME_SUM = "sum";
+
+    private static final String FIELD_NAME_SKEW = "skew";
+
+    @JsonProperty(value = FIELD_NAME_ID, required = true)
+    private final String id;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_MIN)
+    private final Double min;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_MAX)
+    private final Double max;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_AVG)
+    private final Double avg;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SUM)
+    private final Double sum;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SKEW)
+    private final Double skew;
+
+    @JsonCreator
+    public AggregatedMetric(
+            final @JsonProperty(value = FIELD_NAME_ID, required = true) String 
id,
+            final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
+            final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
+            final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
+            final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
+            final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
+
+        this.id = requireNonNull(id, "id must not be null");
+        this.min = min;
+        this.max = max;
+        this.avg = avg;
+        this.sum = sum;
+        this.skew = skew;
+    }
+
+    public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, 
required = true) String id) {
+        this(id, null, null, null, null, null);
+    }
+
+    @JsonIgnore
+    public String getId() {
+        return id;
+    }
+
+    @JsonIgnore
+    public Double getMin() {
+        return min;
+    }
+
+    @JsonIgnore
+    public Double getMax() {
+        return max;
+    }
+
+    @JsonIgnore
+    public Double getSum() {
+        return sum;
+    }
+
+    @JsonIgnore
+    public Double getAvg() {
+        return avg;
+    }
+
+    @JsonIgnore
+    public Double getSkew() {
+        return skew;
+    }
+
+    @Override
+    public String toString() {
+        return "AggregatedMetric{"
+                + "id='"
+                + id
+                + '\''
+                + ", mim='"
+                + min
+                + '\''
+                + ", max='"
+                + max
+                + '\''
+                + ", avg='"
+                + avg
+                + '\''
+                + ", sum='"
+                + sum
+                + '\''
+                + ", skew='"
+                + skew
+                + '\''
+                + '}';
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
index 09b9bb65..67d77c35 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
@@ -230,6 +230,9 @@ public class RestApiMetricsCollector<KEY, Context extends 
JobAutoScalerContext<K
                                                 null,
                                                 m1.getSum() != null
                                                         ? m1.getSum() + 
m2.getSum()
+                                                        : null,
+                                                m1.getSkew() != null
+                                                        ? 
Math.max(m1.getSkew(), m2.getSkew())
                                                         : null)));
     }
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 32abd946..12e99ddc 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -74,6 +74,6 @@ public enum FlinkMetric {
     }
 
     private static AggregatedMetric zero() {
-        return new AggregatedMetric("", 0., 0., 0., 0.);
+        return new AggregatedMetric("", 0., 0., 0., 0., 0.);
     }
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 84996235..9069978f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -230,7 +230,8 @@ public class ScalingMetrics {
                                 Double.NaN,
                                 Double.NaN,
                                 Double.NaN,
-                                (double) ioMetrics.getNumRecordsIn());
+                                (double) ioMetrics.getNumRecordsIn(),
+                                Double.NaN);
 
         // 2. If the former is unavailable and the vertex contains a source 
operator, use the
         // corresponding source operator metric.
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
new file mode 100644
index 00000000..ba3efcda
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * TODO : It can be removed after upgrading flink dependency to 1.20.
+ *
+ * <p>Response type for aggregated metrics. Contains the metric name and 
optionally the sum,
+ * average, minimum and maximum.
+ */
+public class AggregatedMetric {
+
+    private static final String FIELD_NAME_ID = "id";
+
+    private static final String FIELD_NAME_MIN = "min";
+
+    private static final String FIELD_NAME_MAX = "max";
+
+    private static final String FIELD_NAME_AVG = "avg";
+
+    private static final String FIELD_NAME_SUM = "sum";
+
+    private static final String FIELD_NAME_SKEW = "skew";
+
+    @JsonProperty(value = FIELD_NAME_ID, required = true)
+    private final String id;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_MIN)
+    private final Double min;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_MAX)
+    private final Double max;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_AVG)
+    private final Double avg;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SUM)
+    private final Double sum;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SKEW)
+    private final Double skew;
+
+    @JsonCreator
+    public AggregatedMetric(
+            final @JsonProperty(value = FIELD_NAME_ID, required = true) String 
id,
+            final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
+            final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
+            final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
+            final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
+            final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
+
+        this.id = requireNonNull(id, "id must not be null");
+        this.min = min;
+        this.max = max;
+        this.avg = avg;
+        this.sum = sum;
+        this.skew = skew;
+    }
+
+    public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, 
required = true) String id) {
+        this(id, null, null, null, null, null);
+    }
+
+    @JsonIgnore
+    public String getId() {
+        return id;
+    }
+
+    @JsonIgnore
+    public Double getMin() {
+        return min;
+    }
+
+    @JsonIgnore
+    public Double getMax() {
+        return max;
+    }
+
+    @JsonIgnore
+    public Double getSum() {
+        return sum;
+    }
+
+    @JsonIgnore
+    public Double getAvg() {
+        return avg;
+    }
+
+    @JsonIgnore
+    public Double getSkew() {
+        return skew;
+    }
+
+    @Override
+    public String toString() {
+        return "AggregatedMetric{"
+                + "id='"
+                + id
+                + '\''
+                + ", mim='"
+                + min
+                + '\''
+                + ", max='"
+                + max
+                + '\''
+                + ", avg='"
+                + avg
+                + '\''
+                + ", sum='"
+                + sum
+                + '\''
+                + ", skew='"
+                + skew
+                + '\''
+                + '}';
+    }
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 0cf5b15a..9a5fd718 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -77,11 +77,26 @@ public class RestApiMetricsCollectorTest {
         var aggregatedMetricsResponse =
                 List.of(
                         new AggregatedMetric(
-                                "a.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+                                "a.pendingRecords",
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                100.,
+                                Double.NaN),
                         new AggregatedMetric(
-                                "b.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+                                "b.pendingRecords",
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                100.,
+                                Double.NaN),
                         new AggregatedMetric(
-                                "c.unrelated", Double.NaN, Double.NaN, 
Double.NaN, 100.));
+                                "c.unrelated",
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                100.,
+                                Double.NaN));
 
         var conf = new Configuration();
         var restClusterClient =
@@ -233,10 +248,12 @@ public class RestApiMetricsCollectorTest {
         assertThrows(RuntimeException.class, () -> 
collector.queryTmMetrics(context));
 
         // Test only heap metrics available
-        var heapMax = new AggregatedMetric(HEAP_MAX_NAME, null, 100., null, 
null);
-        var heapUsed = new AggregatedMetric(HEAP_USED_NAME, null, 50., null, 
null);
-        var managedUsed = new AggregatedMetric(MANAGED_MEMORY_NAME, null, 42., 
null, null);
-        var metaspaceUsed = new AggregatedMetric(METASPACE_MEMORY_NAME, null, 
11., null, null);
+        var heapMax = new AggregatedMetric(HEAP_MAX_NAME, null, 100., null, 
null, Double.NaN);
+        var heapUsed = new AggregatedMetric(HEAP_USED_NAME, null, 50., null, 
null, Double.NaN);
+        var managedUsed =
+                new AggregatedMetric(MANAGED_MEMORY_NAME, null, 42., null, 
null, Double.NaN);
+        var metaspaceUsed =
+                new AggregatedMetric(METASPACE_MEMORY_NAME, null, 11., null, 
null, Double.NaN);
         metricValues.put(HEAP_MAX_NAME, heapMax);
         metricValues.put(HEAP_USED_NAME, heapUsed);
         metricValues.put(MANAGED_MEMORY_NAME, managedUsed);
@@ -256,7 +273,7 @@ public class RestApiMetricsCollectorTest {
         collector.cleanup(context.getJobKey());
 
         // Test all metrics available
-        var gcTime = new AggregatedMetric(GC_METRIC_NAME, null, 150., null, 
null);
+        var gcTime = new AggregatedMetric(GC_METRIC_NAME, null, 150., null, 
null, Double.NaN);
         metricValues.put(GC_METRIC_NAME, gcTime);
 
         assertMetricsEquals(
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 4b417ad4..aa9df2fc 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -61,7 +61,8 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, 900., Double.NaN, 
Double.NaN),
+                        new AggregatedMetric(
+                                "", Double.NaN, 900., Double.NaN, Double.NaN, 
Double.NaN),
                         FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC,
                         aggSum(1000.)),
                 scalingMetrics,
@@ -84,7 +85,8 @@ public class ScalingMetricsTest {
                 op,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
+                        new AggregatedMetric(
+                                "", Double.NaN, 100., Double.NaN, Double.NaN, 
Double.NaN),
                         FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC,
                         aggSum(1000.)),
                 scalingMetrics,
@@ -141,7 +143,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", 100., 200., 150., 
Double.NaN)),
+                        new AggregatedMetric("", 100., 200., 150., Double.NaN, 
Double.NaN)),
                 scalingMetrics,
                 ioMetrics,
                 conf);
@@ -155,7 +157,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", 100., 200., 150., 
Double.NaN)),
+                        new AggregatedMetric("", 100., 200., 150., Double.NaN, 
Double.NaN)),
                 scalingMetrics,
                 ioMetrics,
                 conf);
@@ -169,7 +171,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", 100., 200., 150., 
Double.NaN)),
+                        new AggregatedMetric("", 100., 200., 150., Double.NaN, 
Double.NaN)),
                 scalingMetrics,
                 ioMetrics,
                 conf);
@@ -232,12 +234,19 @@ public class ScalingMetricsTest {
                 SOURCE,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, busyness, 
Double.NaN, Double.NaN),
+                        new AggregatedMetric(
+                                "", Double.NaN, busyness, Double.NaN, 
Double.NaN, Double.NaN),
                         FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
backpressure, Double.NaN),
+                        new AggregatedMetric(
+                                "", Double.NaN, Double.NaN, backpressure, 
Double.NaN, Double.NaN),
                         FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
                         new AggregatedMetric(
-                                "", Double.NaN, Double.NaN, Double.NaN, 
processingRate)),
+                                "",
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                processingRate,
+                                Double.NaN)),
                 scalingMetrics,
                 topology,
                 conf,
@@ -279,14 +288,14 @@ public class ScalingMetricsTest {
     }
 
     private static AggregatedMetric aggSum(double sum) {
-        return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 
sum);
+        return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 
sum, Double.NaN);
     }
 
     private static AggregatedMetric aggMax(double max) {
-        return new AggregatedMetric("", Double.NaN, max, Double.NaN, 
Double.NaN);
+        return new AggregatedMetric("", Double.NaN, max, Double.NaN, 
Double.NaN, Double.NaN);
     }
 
     private static AggregatedMetric aggAvgMax(double avg, double max) {
-        return new AggregatedMetric("", Double.NaN, max, avg, Double.NaN);
+        return new AggregatedMetric("", Double.NaN, max, avg, Double.NaN, 
Double.NaN);
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestMetrics.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestMetrics.java
index 9fccca3c..5317ddec 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestMetrics.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestMetrics.java
@@ -58,14 +58,14 @@ public class TestMetrics {
     }
 
     private AggregatedMetric sum(double d) {
-        return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, d);
+        return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, d, 
Double.NaN);
     }
 
     private AggregatedMetric max(double d) {
-        return new AggregatedMetric("", Double.NaN, d, Double.NaN, Double.NaN);
+        return new AggregatedMetric("", Double.NaN, d, Double.NaN, Double.NaN, 
Double.NaN);
     }
 
     private AggregatedMetric avg(double d) {
-        return new AggregatedMetric("", Double.NaN, Double.NaN, d, Double.NaN);
+        return new AggregatedMetric("", Double.NaN, Double.NaN, d, Double.NaN, 
Double.NaN);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
new file mode 100644
index 00000000..ba3efcda
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * TODO : It can be removed after upgrading flink dependency to 1.20.
+ *
+ * <p>Response type for aggregated metrics. Contains the metric name and 
optionally the sum,
+ * average, minimum and maximum.
+ */
+public class AggregatedMetric {
+
+    private static final String FIELD_NAME_ID = "id";
+
+    private static final String FIELD_NAME_MIN = "min";
+
+    private static final String FIELD_NAME_MAX = "max";
+
+    private static final String FIELD_NAME_AVG = "avg";
+
+    private static final String FIELD_NAME_SUM = "sum";
+
+    private static final String FIELD_NAME_SKEW = "skew";
+
+    @JsonProperty(value = FIELD_NAME_ID, required = true)
+    private final String id;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_MIN)
+    private final Double min;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_MAX)
+    private final Double max;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_AVG)
+    private final Double avg;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SUM)
+    private final Double sum;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NAME_SKEW)
+    private final Double skew;
+
+    @JsonCreator
+    public AggregatedMetric(
+            final @JsonProperty(value = FIELD_NAME_ID, required = true) String 
id,
+            final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
+            final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
+            final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
+            final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
+            final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
+
+        this.id = requireNonNull(id, "id must not be null");
+        this.min = min;
+        this.max = max;
+        this.avg = avg;
+        this.sum = sum;
+        this.skew = skew;
+    }
+
+    public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, 
required = true) String id) {
+        this(id, null, null, null, null, null);
+    }
+
+    @JsonIgnore
+    public String getId() {
+        return id;
+    }
+
+    @JsonIgnore
+    public Double getMin() {
+        return min;
+    }
+
+    @JsonIgnore
+    public Double getMax() {
+        return max;
+    }
+
+    @JsonIgnore
+    public Double getSum() {
+        return sum;
+    }
+
+    @JsonIgnore
+    public Double getAvg() {
+        return avg;
+    }
+
+    @JsonIgnore
+    public Double getSkew() {
+        return skew;
+    }
+
+    @Override
+    public String toString() {
+        return "AggregatedMetric{"
+                + "id='"
+                + id
+                + '\''
+                + ", mim='"
+                + min
+                + '\''
+                + ", max='"
+                + max
+                + '\''
+                + ", avg='"
+                + avg
+                + '\''
+                + ", sum='"
+                + sum
+                + '\''
+                + ", skew='"
+                + skew
+                + '\''
+                + '}';
+    }
+}

Reply via email to