1996fanrui commented on code in PR #685:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1367619403


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java:
##########
@@ -43,26 +52,71 @@ public void handleEvent(
             String reason,
             String message,
             @Nullable String messageKey,
-            @Nullable Duration interval) {
-        if (interval == null) {
-            eventRecorder.triggerEvent(
-                    context.getResource(),
-                    EventRecorder.Type.valueOf(type.name()),
-                    reason,
-                    message,
-                    EventRecorder.Component.Operator,
-                    messageKey,
-                    context.getKubernetesClient());
+            @Nonnull Duration interval) {
+        eventRecorder.triggerEvent(
+                context.getResource(),
+                EventRecorder.Type.valueOf(type.name()),
+                reason,
+                message,
+                EventRecorder.Component.Operator,
+                messageKey,
+                context.getKubernetesClient());
+    }
+
+    @Override
+    public void handleScalingEvent(
+            KubernetesJobAutoScalerContext context,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            boolean scaled,
+            @Nonnull Duration interval) {
+        if (scaled) {
+            AutoScalerEventHandler.super.handleScalingEvent(
+                    context, scalingSummaries, scaled, interval);
         } else {
-            eventRecorder.triggerEventByInterval(
+            var conf = context.getConfiguration();
+            var scalingReport = 
AutoScalerEventHandler.scalingReport(scalingSummaries, scaled);
+            var labels = Map.of(PARALLELISM_MAP_KEY, 
getParallelismHashCode(scalingSummaries));

Review Comment:
   Hi @clarax ,IIUC, the current strategy is: when the scaled is disabled, we 
want to remove duplicates based on `getParallelismHashCode(scalingSummaries)` 
and interval, right?
   
   If yes, how about we don't change the `AutoScalerEventHandler` and consider 
`getParallelismHashCode(scalingSummaries)` as the messageKey? 
   
   And refactor ingthe `EventUtils`:
   
   -  Ignoring repeating message based on the `generatedMessageKey` and 
`interval`
   - `generatedMessageKey = messageKey != null ? messageKey : message;`
   - For existed event,  when the {@link #MESSAGE_KEY} exists in label, it's 
messageKey, otherwise the message is messageKey.
   
   Here is my POC commit[1], it maybe clearer. If the POC has the same effect 
as the current PR, I like this POC.
   
   Because the interface is clearer and  `AutoScalerEventHandler` doesn't care 
the logic about `handleScalingEvent`.
   
   Please correct me if my understanding is wrong, thanks~
   
   [1] 
https://github.com/1996fanrui/flink-kubernetes-operator/commit/14e8f189f4d129eba23050ea37fe8c012f0a22d2



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java:
##########
@@ -43,26 +52,71 @@ public void handleEvent(
             String reason,
             String message,
             @Nullable String messageKey,
-            @Nullable Duration interval) {
-        if (interval == null) {
-            eventRecorder.triggerEvent(
-                    context.getResource(),
-                    EventRecorder.Type.valueOf(type.name()),
-                    reason,
-                    message,
-                    EventRecorder.Component.Operator,
-                    messageKey,
-                    context.getKubernetesClient());
+            @Nonnull Duration interval) {

Review Comment:
   `interval` isn't used here, so it doesn't take effect. 
   
   From this implementation, the interface deserves to be discussed how to 
support interval capabilities more reasonably.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java:
##########
@@ -117,53 +119,53 @@ public static boolean createByInterval(
             EventRecorder.Component component,
             Consumer<Event> eventListener,
             @Nullable String messageKey,
-            Duration interval) {
-
+            @Nullable BiPredicate<Map<String, String>, Instant> 
suppressionPredicate,
+            @Nullable Map<String, String> labels) {
         String eventName =
                 generateEventName(
                         target, type, reason, messageKey != null ? messageKey 
: message, component);
         Event existing = findExistingEvent(client, target, eventName);
 
         if (existing != null) {
-            if (Objects.equals(existing.getMessage(), message)
-                    && Instant.now()
-                            .isBefore(
-                                    Instant.parse(existing.getLastTimestamp())
-                                            .plusMillis(interval.toMillis()))) 
{
-                return false;
-            } else {
-                createUpdatedEvent(existing, client, message, eventListener);
+            if (suppressionPredicate != null
+                    && existing.getMetadata() != null
+                    && suppressionPredicate.test(
+                            existing.getMetadata().getLabels(),
+                            Instant.parse(existing.getLastTimestamp()))) {
                 return false;
             }
+            updatedEventWithLabels(existing, client, message, eventListener, 
labels);
+            return false;
         } else {
-            createNewEvent(
-                    client, target, type, reason, message, component, 
eventListener, eventName);
+            Event event = buildEvent(target, type, reason, message, component, 
eventName);
+            setLabels(event, labels);
+            eventListener.accept(client.resource(event).createOrReplace());
             return true;
         }
     }
 
-    private static void createUpdatedEvent(
+    private static void updatedEventWithLabels(
             Event existing,
             KubernetesClient client,
             String message,
-            Consumer<Event> eventListener) {
+            Consumer<Event> eventListener,
+            @Nullable Map<String, String> labels) {
         existing.setLastTimestamp(Instant.now().toString());
         existing.setCount(existing.getCount() + 1);
         existing.setMessage(message);
+        setLabels(existing, labels);
         eventListener.accept(client.resource(existing).createOrReplace());
     }
 
-    private static void createNewEvent(
-            KubernetesClient client,
-            HasMetadata target,
-            EventRecorder.Type type,
-            String reason,
-            String message,
-            EventRecorder.Component component,
-            Consumer<Event> eventListener,
-            String eventName) {
-        Event event = buildEvent(target, type, reason, message, component, 
eventName);
-        eventListener.accept(client.resource(event).createOrReplace());
+    private static void setLabels(Event existing, @Nullable Map<String, 
String> labels) {
+        if (existing.getMetadata() == null) {

Review Comment:
   If labels is null, is this `setLabels` necessary? 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java:
##########
@@ -117,53 +119,53 @@ public static boolean createByInterval(
             EventRecorder.Component component,
             Consumer<Event> eventListener,
             @Nullable String messageKey,
-            Duration interval) {
-
+            @Nullable BiPredicate<Map<String, String>, Instant> 
suppressionPredicate,
+            @Nullable Map<String, String> labels) {
         String eventName =
                 generateEventName(
                         target, type, reason, messageKey != null ? messageKey 
: message, component);
         Event existing = findExistingEvent(client, target, eventName);
 
         if (existing != null) {
-            if (Objects.equals(existing.getMessage(), message)
-                    && Instant.now()
-                            .isBefore(
-                                    Instant.parse(existing.getLastTimestamp())
-                                            .plusMillis(interval.toMillis()))) 
{
-                return false;
-            } else {
-                createUpdatedEvent(existing, client, message, eventListener);
+            if (suppressionPredicate != null
+                    && existing.getMetadata() != null
+                    && suppressionPredicate.test(
+                            existing.getMetadata().getLabels(),
+                            Instant.parse(existing.getLastTimestamp()))) {
                 return false;
             }
+            updatedEventWithLabels(existing, client, message, eventListener, 
labels);
+            return false;
         } else {
-            createNewEvent(
-                    client, target, type, reason, message, component, 
eventListener, eventName);
+            Event event = buildEvent(target, type, reason, message, component, 
eventName);
+            setLabels(event, labels);
+            eventListener.accept(client.resource(event).createOrReplace());
             return true;
         }
     }
 
-    private static void createUpdatedEvent(
+    private static void updatedEventWithLabels(
             Event existing,
             KubernetesClient client,
             String message,
-            Consumer<Event> eventListener) {
+            Consumer<Event> eventListener,
+            @Nullable Map<String, String> labels) {
         existing.setLastTimestamp(Instant.now().toString());
         existing.setCount(existing.getCount() + 1);
         existing.setMessage(message);
+        setLabels(existing, labels);
         eventListener.accept(client.resource(existing).createOrReplace());
     }
 
-    private static void createNewEvent(
-            KubernetesClient client,
-            HasMetadata target,
-            EventRecorder.Type type,
-            String reason,
-            String message,
-            EventRecorder.Component component,
-            Consumer<Event> eventListener,
-            String eventName) {
-        Event event = buildEvent(target, type, reason, message, component, 
eventName);
-        eventListener.accept(client.resource(event).createOrReplace());
+    private static void setLabels(Event existing, @Nullable Map<String, 
String> labels) {
+        if (existing.getMetadata() == null) {
+            var metaData = new ObjectMeta();
+            metaData.setLabels(labels);
+        } else if (existing.getMetadata().getLabels() == null) {
+            existing.getMetadata().setLabels(labels);
+        } else {
+            existing.getMetadata().setLabels(labels);
+        }

Review Comment:
   ```suggestion
           } else {
               existing.getMetadata().setLabels(labels);
           }
   ```
   
   Same behavior for different code conditions, they can be simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to