This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 7cad846d8704a469229728b494f79d4e1d5c4513
Author: Rui Fan <[email protected]>
AuthorDate: Wed Jan 31 17:06:07 2024 +0800

    [refactor] Moving the getParallelismHashCode from 
KubernetesAutoScalerEventHandler to AutoScalerEventHandler
---
 .../autoscaler/event/AutoScalerEventHandler.java   | 17 ++++++++++++++
 .../KubernetesAutoScalerEventHandler.java          | 26 +++++-----------------
 2 files changed, 23 insertions(+), 20 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index 5695d109..df0f0d84 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
@@ -100,6 +101,22 @@ public interface AutoScalerEventHandler<KEY, Context 
extends JobAutoScalerContex
         return sb.toString();
     }
 
+    static String getParallelismHashCode(Map<JobVertexID, ScalingSummary> 
scalingSummaryHashMap) {
+        return Integer.toString(
+                scalingSummaryHashMap.entrySet().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                e -> e.getKey().toString(),
+                                                e ->
+                                                        String.format(
+                                                                "Parallelism 
%d -> %d",
+                                                                e.getValue()
+                                                                        
.getCurrentParallelism(),
+                                                                
e.getValue().getNewParallelism())))
+                                .hashCode()
+                        & 0x7FFFFFFF);
+    }
+
     /** The type of the events. */
     enum Type {
         Normal,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
index a08a27ba..1a836a52 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
@@ -30,7 +30,6 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 /** An event handler which posts events to the Kubernetes events API. */
 public class KubernetesAutoScalerEventHandler
@@ -73,7 +72,10 @@ public class KubernetesAutoScalerEventHandler
                     context, scalingSummaries, message, null);
         } else {
             var scalingReport = 
AutoScalerEventHandler.scalingReport(scalingSummaries, message);
-            var labels = Map.of(PARALLELISM_MAP_KEY, 
getParallelismHashCode(scalingSummaries));
+            var labels =
+                    Map.of(
+                            PARALLELISM_MAP_KEY,
+                            
AutoScalerEventHandler.getParallelismHashCode(scalingSummaries));
 
             @Nullable
             Predicate<Map<String, String>> dedupePredicate =
@@ -83,7 +85,8 @@ public class KubernetesAutoScalerEventHandler
                             return stringStringMap != null
                                     && Objects.equals(
                                             
stringStringMap.get(PARALLELISM_MAP_KEY),
-                                            
getParallelismHashCode(scalingSummaries));
+                                            
AutoScalerEventHandler.getParallelismHashCode(
+                                                    scalingSummaries));
                         }
                     };
 
@@ -100,21 +103,4 @@ public class KubernetesAutoScalerEventHandler
                     labels);
         }
     }
-
-    private static String getParallelismHashCode(
-            Map<JobVertexID, ScalingSummary> scalingSummaryHashMap) {
-        return Integer.toString(
-                scalingSummaryHashMap.entrySet().stream()
-                                .collect(
-                                        Collectors.toMap(
-                                                e -> e.getKey().toString(),
-                                                e ->
-                                                        String.format(
-                                                                "Parallelism 
%d -> %d",
-                                                                e.getValue()
-                                                                        
.getCurrentParallelism(),
-                                                                
e.getValue().getNewParallelism())))
-                                .hashCode()
-                        & 0x7FFFFFFF);
-    }
 }

Reply via email to