This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 7cad846d8704a469229728b494f79d4e1d5c4513 Author: Rui Fan <[email protected]> AuthorDate: Wed Jan 31 17:06:07 2024 +0800 [refactor] Moving the getParallelismHashCode from KubernetesAutoScalerEventHandler to AutoScalerEventHandler --- .../autoscaler/event/AutoScalerEventHandler.java | 17 ++++++++++++++ .../KubernetesAutoScalerEventHandler.java | 26 +++++----------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java index 5695d109..df0f0d84 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; @@ -100,6 +101,22 @@ public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContex return sb.toString(); } + static String getParallelismHashCode(Map<JobVertexID, ScalingSummary> scalingSummaryHashMap) { + return Integer.toString( + scalingSummaryHashMap.entrySet().stream() + .collect( + Collectors.toMap( + e -> e.getKey().toString(), + e -> + String.format( + "Parallelism %d -> %d", + e.getValue() + .getCurrentParallelism(), + e.getValue().getNewParallelism()))) + .hashCode() + & 0x7FFFFFFF); + } + /** The type of the events. */ enum Type { Normal, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java index a08a27ba..1a836a52 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java @@ -30,7 +30,6 @@ import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.function.Predicate; -import java.util.stream.Collectors; /** An event handler which posts events to the Kubernetes events API. */ public class KubernetesAutoScalerEventHandler @@ -73,7 +72,10 @@ public class KubernetesAutoScalerEventHandler context, scalingSummaries, message, null); } else { var scalingReport = AutoScalerEventHandler.scalingReport(scalingSummaries, message); - var labels = Map.of(PARALLELISM_MAP_KEY, getParallelismHashCode(scalingSummaries)); + var labels = + Map.of( + PARALLELISM_MAP_KEY, + AutoScalerEventHandler.getParallelismHashCode(scalingSummaries)); @Nullable Predicate<Map<String, String>> dedupePredicate = @@ -83,7 +85,8 @@ public class KubernetesAutoScalerEventHandler return stringStringMap != null && Objects.equals( stringStringMap.get(PARALLELISM_MAP_KEY), - getParallelismHashCode(scalingSummaries)); + AutoScalerEventHandler.getParallelismHashCode( + scalingSummaries)); } }; @@ -100,21 +103,4 @@ public class KubernetesAutoScalerEventHandler labels); } } - - private static String getParallelismHashCode( - Map<JobVertexID, ScalingSummary> scalingSummaryHashMap) { - return Integer.toString( - scalingSummaryHashMap.entrySet().stream() - .collect( - Collectors.toMap( - e -> e.getKey().toString(), - e -> - String.format( - "Parallelism %d -> %d", - e.getValue() - .getCurrentParallelism(), - e.getValue().getNewParallelism()))) - .hashCode() - & 0x7FFFFFFF); - } }
