This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 305498a9 [FLINK-33187] Don't record duplicate event if no change
305498a9 is described below

commit 305498a9ab2e04ab71a4c2d87f2edb746373df1a
Author: clarax <[email protected]>
AuthorDate: Sun Oct 8 23:12:41 2023 -0700

    [FLINK-33187] Don't record duplicate event if no change
---
 .../generated/auto_scaler_configuration.html       |   6 +
 .../operator/autoscaler/ScalingExecutor.java       |  26 ++-
 .../autoscaler/config/AutoScalerOptions.java       |   6 +
 .../operator/autoscaler/ScalingExecutorTest.java   |  38 +++-
 .../kubernetes/operator/utils/EventRecorder.java   |  22 +++
 .../kubernetes/operator/utils/EventUtils.java      |  92 ++++++++--
 .../kubernetes/operator/utils/EventUtilsTest.java  | 202 +++++++++++++++++++++
 7 files changed, 362 insertions(+), 30 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 95af5541..8b9d02b7 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -92,6 +92,12 @@
             <td>Boolean</td>
             <td>Enable vertex scaling execution by the autoscaler. If 
disabled, the autoscaler will only collect metrics and evaluate the suggested 
parallelism for each vertex but will not upgrade the jobs.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.scaling.report.interval</h5></td>
+            <td style="word-wrap: break-word;">30 min</td>
+            <td>Duration</td>
+            <td>Time interval to resend the identical event</td>
+        </tr>
         <tr>
             
<td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 1d6a9cd0..6367381c 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -94,17 +94,27 @@ public class ScalingExecutor {
         var scalingEnabled = conf.get(SCALING_ENABLED);
 
         var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
-        eventRecorder.triggerEvent(
-                resource,
-                EventRecorder.Type.Normal,
-                EventRecorder.Reason.ScalingReport,
-                EventRecorder.Component.Operator,
-                scalingReport,
-                "ScalingExecutor",
-                client);
 
         if (!scalingEnabled) {
+            eventRecorder.triggerEventByInterval(
+                    resource,
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.ScalingReport,
+                    EventRecorder.Component.Operator,
+                    scalingReport,
+                    "ScalingExecutor",
+                    client,
+                    conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL));
             return false;
+        } else {
+            eventRecorder.triggerEvent(
+                    resource,
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.ScalingReport,
+                    EventRecorder.Component.Operator,
+                    scalingReport,
+                    "ScalingExecutor",
+                    client);
         }
 
         scalingInformation.addToScalingHistory(clock.instant(), 
scalingSummaries, conf);
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index df11e28a..276201dc 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -168,4 +168,10 @@ public class AutoScalerOptions {
                     .defaultValues()
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> SCALING_REPORT_INTERVAL =
+            autoScalerConfig("scaling.report.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1800))
+                    .withDescription("Time interval to resend the identical 
event");
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index f3a26525..c315300b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -169,16 +169,47 @@ public class ScalingExecutorTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testScalingEvents(boolean scalingEnabled) {
+    public void testScalingEventsWith0Interval(boolean scalingEnabled) {
+        testScalingEvents(scalingEnabled, Duration.ofSeconds(0));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testScalingEventsWithInterval(boolean scalingEnabled) {
+        testScalingEvents(scalingEnabled, Duration.ofSeconds(1800));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) {
+        testScalingEvents(scalingEnabled, null);
+    }
+
+    private void testScalingEvents(boolean scalingEnabled, Duration interval) {
         var jobVertexID = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled);
+
         var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));
+
+        if (interval != null) {
+            conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval);
+        }
+
         var scalingInfo = new AutoScalerInfo(new HashMap<>());
         assertEquals(
                 scalingEnabled,
                 scalingDecisionExecutor.scaleResource(
                         flinkDep, scalingInfo, conf, metrics, 
kubernetesClient));
-        assertEquals(1, eventCollector.events.size());
+        assertEquals(
+                scalingEnabled,
+                scalingDecisionExecutor.scaleResource(
+                        flinkDep, scalingInfo, conf, metrics, 
kubernetesClient));
+        assertEquals(
+                (interval == null || (!interval.isNegative() && 
!interval.isZero()))
+                                && !scalingEnabled
+                        ? 1
+                        : 2,
+                eventCollector.events.size());
         var event = eventCollector.events.poll();
         assertTrue(
                 event.getMessage()
@@ -200,6 +231,9 @@ public class ScalingExecutorTest {
         assertEquals(EventRecorder.Reason.ScalingReport.name(), 
event.getReason());
 
         metrics = Map.of(jobVertexID, evaluated(1, 110, 101));
+
+        assertEquals(1, event.getCount());
+
         assertEquals(
                 scalingEnabled,
                 scalingDecisionExecutor.scaleResource(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index bc5eab3c..09c176f4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -28,6 +28,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.function.BiConsumer;
 
@@ -112,6 +113,27 @@ public class EventRecorder {
                 messageKey);
     }
 
+    public boolean triggerEventByInterval(
+            AbstractFlinkResource<?, ?> resource,
+            Type type,
+            Reason reason,
+            Component component,
+            String message,
+            @Nullable String messageKey,
+            KubernetesClient client,
+            Duration interval) {
+        return EventUtils.createByInterval(
+                client,
+                resource,
+                type,
+                reason.toString(),
+                message,
+                component,
+                e -> eventListener.accept(resource, e),
+                messageKey,
+                interval);
+    }
+
     public boolean triggerEvent(
             AbstractFlinkResource<?, ?> resource,
             Type type,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index e4b6cc7b..dfdbdd9e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -25,7 +25,9 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.time.Instant;
+import java.util.Objects;
 import java.util.function.Consumer;
 
 /**
@@ -61,24 +63,16 @@ public class EventUtils {
             EventRecorder.Component component,
             Consumer<Event> eventListener,
             @Nullable String messageKey) {
-
-        String eventName =
-                generateEventName(
-                        target, type, reason, messageKey != null ? messageKey 
: message, component);
-        Event existing = findExistingEvent(client, target, eventName);
-
-        if (existing != null) {
-            // update
-            existing.setLastTimestamp(Instant.now().toString());
-            existing.setCount(existing.getCount() + 1);
-            existing.setMessage(message);
-            eventListener.accept(client.resource(existing).createOrReplace());
-            return false;
-        } else {
-            Event event = buildEvent(target, type, reason, message, component, 
eventName);
-            eventListener.accept(client.resource(event).createOrReplace());
-            return true;
-        }
+        return createByInterval(
+                client,
+                target,
+                type,
+                reason,
+                message,
+                component,
+                eventListener,
+                messageKey,
+                Duration.ofSeconds(0));
     }
 
     private static Event findExistingEvent(
@@ -108,12 +102,70 @@ public class EventUtils {
         if (existing != null) {
             return false;
         } else {
-            Event event = buildEvent(target, type, reason, message, component, 
eventName);
-            eventListener.accept(client.resource(event).createOrReplace());
+            createNewEvent(
+                    client, target, type, reason, message, component, 
eventListener, eventName);
+            return true;
+        }
+    }
+
+    public static boolean createByInterval(
+            KubernetesClient client,
+            HasMetadata target,
+            EventRecorder.Type type,
+            String reason,
+            String message,
+            EventRecorder.Component component,
+            Consumer<Event> eventListener,
+            @Nullable String messageKey,
+            Duration interval) {
+
+        String eventName =
+                generateEventName(
+                        target, type, reason, messageKey != null ? messageKey 
: message, component);
+        Event existing = findExistingEvent(client, target, eventName);
+
+        if (existing != null) {
+            if (Objects.equals(existing.getMessage(), message)
+                    && Instant.now()
+                            .isBefore(
+                                    Instant.parse(existing.getLastTimestamp())
+                                            .plusMillis(interval.toMillis()))) 
{
+                return false;
+            } else {
+                createUpdatedEvent(existing, client, message, eventListener);
+                return false;
+            }
+        } else {
+            createNewEvent(
+                    client, target, type, reason, message, component, 
eventListener, eventName);
             return true;
         }
     }
 
+    private static void createUpdatedEvent(
+            Event existing,
+            KubernetesClient client,
+            String message,
+            Consumer<Event> eventListener) {
+        existing.setLastTimestamp(Instant.now().toString());
+        existing.setCount(existing.getCount() + 1);
+        existing.setMessage(message);
+        eventListener.accept(client.resource(existing).createOrReplace());
+    }
+
+    private static void createNewEvent(
+            KubernetesClient client,
+            HasMetadata target,
+            EventRecorder.Type type,
+            String reason,
+            String message,
+            EventRecorder.Component component,
+            Consumer<Event> eventListener,
+            String eventName) {
+        Event event = buildEvent(target, type, reason, message, component, 
eventName);
+        eventListener.accept(client.resource(event).createOrReplace());
+    }
+
     private static Event buildEvent(
             HasMetadata target,
             EventRecorder.Type type,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index 168ec2f5..c58b020b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -26,6 +26,7 @@ import 
io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.function.Consumer;
 
 /** Test for {@link EventUtils}. */
@@ -111,6 +112,84 @@ public class EventUtilsTest {
                         null));
     }
 
+    @Test
+    public void testCreateUpdatedEvent() {
+        var consumer =
+                new Consumer<Event>() {
+                    @Override
+                    public void accept(Event event) {
+                        eventConsumed = event;
+                    }
+                };
+        var flinkApp = TestUtils.buildApplicationCluster();
+        var reason = "Cleanup";
+        var message = "message";
+        var eventName =
+                EventUtils.generateEventName(
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        message,
+                        EventRecorder.Component.Operator);
+        Assertions.assertTrue(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        message,
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        null,
+                        Duration.ofSeconds(1800)));
+        var event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertEquals(eventConsumed, event);
+        Assertions.assertEquals(1, event.getCount());
+        Assertions.assertEquals(reason, event.getReason());
+
+        eventConsumed = null;
+        Assertions.assertFalse(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        message,
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        null,
+                        Duration.ofSeconds(1800)));
+        event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertNull(eventConsumed);
+        Assertions.assertEquals(1, event.getCount());
+
+        Assertions.assertTrue(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        null,
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        null,
+                        Duration.ofSeconds(1800)));
+    }
+
     @Test
     public void testCreateWithMessageKey() {
         var consumer =
@@ -174,6 +253,129 @@ public class EventUtilsTest {
         Assertions.assertEquals(2, event.getCount());
     }
 
+    @Test
+    public void testCreateByIntervalWithMessageKey() {
+        var consumer =
+                new Consumer<Event>() {
+                    @Override
+                    public void accept(Event event) {
+                        eventConsumed = event;
+                    }
+                };
+        var flinkApp = TestUtils.buildApplicationCluster();
+        var reason = "Cleanup";
+        var eventName =
+                EventUtils.generateEventName(
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "mk",
+                        EventRecorder.Component.Operator);
+
+        eventConsumed = null;
+        Assertions.assertTrue(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "message1",
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        "mk",
+                        Duration.ofSeconds(1800)));
+        var event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertEquals(eventConsumed, event);
+        Assertions.assertEquals("message1", event.getMessage());
+        Assertions.assertEquals(1, event.getCount());
+
+        eventConsumed = null;
+        Assertions.assertFalse(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "message2",
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        "mk",
+                        Duration.ofSeconds(1800)));
+
+        event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertEquals(eventConsumed, event);
+        Assertions.assertEquals("message2", event.getMessage());
+        Assertions.assertEquals(2, event.getCount());
+
+        eventConsumed = null;
+        Assertions.assertFalse(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "message2",
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        "mk",
+                        Duration.ofSeconds(1800)));
+
+        event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertNull(eventConsumed);
+
+        eventConsumed = null;
+        Assertions.assertTrue(
+                EventUtils.createByInterval(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "message2",
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        "mk2",
+                        Duration.ofSeconds(1800)));
+        eventName =
+                EventUtils.generateEventName(
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "mk2",
+                        EventRecorder.Component.Operator);
+        event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertEquals(eventConsumed, event);
+        Assertions.assertEquals("message2", event.getMessage());
+        Assertions.assertEquals(1, event.getCount());
+    }
+
     @Test
     public void testSameResourceNameWithDifferentUidNotShareEvents() {
         var flinkApp = TestUtils.buildApplicationCluster();

Reply via email to