This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 305498a9 [FLINK-33187] Don't record duplicate event if no change
305498a9 is described below
commit 305498a9ab2e04ab71a4c2d87f2edb746373df1a
Author: clarax <[email protected]>
AuthorDate: Sun Oct 8 23:12:41 2023 -0700
[FLINK-33187] Don't record duplicate event if no change
---
.../generated/auto_scaler_configuration.html | 6 +
.../operator/autoscaler/ScalingExecutor.java | 26 ++-
.../autoscaler/config/AutoScalerOptions.java | 6 +
.../operator/autoscaler/ScalingExecutorTest.java | 38 +++-
.../kubernetes/operator/utils/EventRecorder.java | 22 +++
.../kubernetes/operator/utils/EventUtils.java | 92 ++++++++--
.../kubernetes/operator/utils/EventUtilsTest.java | 202 +++++++++++++++++++++
7 files changed, 362 insertions(+), 30 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 95af5541..8b9d02b7 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -92,6 +92,12 @@
<td>Boolean</td>
<td>Enable vertex scaling execution by the autoscaler. If
disabled, the autoscaler will only collect metrics and evaluate the suggested
parallelism for each vertex but will not upgrade the jobs.</td>
</tr>
+ <tr>
+
<td><h5>kubernetes.operator.job.autoscaler.scaling.report.interval</h5></td>
+ <td style="word-wrap: break-word;">30 min</td>
+ <td>Duration</td>
+ <td>Time interval to resend the identical event</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td>
<td style="word-wrap: break-word;">5 min</td>
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 1d6a9cd0..6367381c 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -94,17 +94,27 @@ public class ScalingExecutor {
var scalingEnabled = conf.get(SCALING_ENABLED);
var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
- eventRecorder.triggerEvent(
- resource,
- EventRecorder.Type.Normal,
- EventRecorder.Reason.ScalingReport,
- EventRecorder.Component.Operator,
- scalingReport,
- "ScalingExecutor",
- client);
if (!scalingEnabled) {
+ eventRecorder.triggerEventByInterval(
+ resource,
+ EventRecorder.Type.Normal,
+ EventRecorder.Reason.ScalingReport,
+ EventRecorder.Component.Operator,
+ scalingReport,
+ "ScalingExecutor",
+ client,
+ conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL));
return false;
+ } else {
+ eventRecorder.triggerEvent(
+ resource,
+ EventRecorder.Type.Normal,
+ EventRecorder.Reason.ScalingReport,
+ EventRecorder.Component.Operator,
+ scalingReport,
+ "ScalingExecutor",
+ client);
}
scalingInformation.addToScalingHistory(clock.instant(),
scalingSummaries, conf);
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index df11e28a..276201dc 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -168,4 +168,10 @@ public class AutoScalerOptions {
.defaultValues()
.withDescription(
"A (semicolon-separated) list of vertex ids in
hexstring for which to disable scaling. Caution: For non-sink vertices this
will still scale their downstream operators until
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+ public static final ConfigOption<Duration> SCALING_REPORT_INTERVAL =
+ autoScalerConfig("scaling.report.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1800))
+ .withDescription("Time interval to resend the identical
event");
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index f3a26525..c315300b 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -169,16 +169,47 @@ public class ScalingExecutorTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testScalingEvents(boolean scalingEnabled) {
+ public void testScalingEventsWith0Interval(boolean scalingEnabled) {
+ testScalingEvents(scalingEnabled, Duration.ofSeconds(0));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testScalingEventsWithInterval(boolean scalingEnabled) {
+ testScalingEvents(scalingEnabled, Duration.ofSeconds(1800));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) {
+ testScalingEvents(scalingEnabled, null);
+ }
+
+ private void testScalingEvents(boolean scalingEnabled, Duration interval) {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled);
+
var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));
+
+ if (interval != null) {
+ conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval);
+ }
+
var scalingInfo = new AutoScalerInfo(new HashMap<>());
assertEquals(
scalingEnabled,
scalingDecisionExecutor.scaleResource(
flinkDep, scalingInfo, conf, metrics,
kubernetesClient));
- assertEquals(1, eventCollector.events.size());
+ assertEquals(
+ scalingEnabled,
+ scalingDecisionExecutor.scaleResource(
+ flinkDep, scalingInfo, conf, metrics,
kubernetesClient));
+ assertEquals(
+ (interval == null || (!interval.isNegative() &&
!interval.isZero()))
+ && !scalingEnabled
+ ? 1
+ : 2,
+ eventCollector.events.size());
var event = eventCollector.events.poll();
assertTrue(
event.getMessage()
@@ -200,6 +231,9 @@ public class ScalingExecutorTest {
assertEquals(EventRecorder.Reason.ScalingReport.name(),
event.getReason());
metrics = Map.of(jobVertexID, evaluated(1, 110, 101));
+
+ assertEquals(1, event.getCount());
+
assertEquals(
scalingEnabled,
scalingDecisionExecutor.scaleResource(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index bc5eab3c..09c176f4 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -28,6 +28,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Collection;
import java.util.function.BiConsumer;
@@ -112,6 +113,27 @@ public class EventRecorder {
messageKey);
}
+ public boolean triggerEventByInterval(
+ AbstractFlinkResource<?, ?> resource,
+ Type type,
+ Reason reason,
+ Component component,
+ String message,
+ @Nullable String messageKey,
+ KubernetesClient client,
+ Duration interval) {
+ return EventUtils.createByInterval(
+ client,
+ resource,
+ type,
+ reason.toString(),
+ message,
+ component,
+ e -> eventListener.accept(resource, e),
+ messageKey,
+ interval);
+ }
+
public boolean triggerEvent(
AbstractFlinkResource<?, ?> resource,
Type type,
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index e4b6cc7b..dfdbdd9e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -25,7 +25,9 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.time.Instant;
+import java.util.Objects;
import java.util.function.Consumer;
/**
@@ -61,24 +63,16 @@ public class EventUtils {
EventRecorder.Component component,
Consumer<Event> eventListener,
@Nullable String messageKey) {
-
- String eventName =
- generateEventName(
- target, type, reason, messageKey != null ? messageKey
: message, component);
- Event existing = findExistingEvent(client, target, eventName);
-
- if (existing != null) {
- // update
- existing.setLastTimestamp(Instant.now().toString());
- existing.setCount(existing.getCount() + 1);
- existing.setMessage(message);
- eventListener.accept(client.resource(existing).createOrReplace());
- return false;
- } else {
- Event event = buildEvent(target, type, reason, message, component,
eventName);
- eventListener.accept(client.resource(event).createOrReplace());
- return true;
- }
+ return createByInterval(
+ client,
+ target,
+ type,
+ reason,
+ message,
+ component,
+ eventListener,
+ messageKey,
+ Duration.ofSeconds(0));
}
private static Event findExistingEvent(
@@ -108,12 +102,70 @@ public class EventUtils {
if (existing != null) {
return false;
} else {
- Event event = buildEvent(target, type, reason, message, component,
eventName);
- eventListener.accept(client.resource(event).createOrReplace());
+ createNewEvent(
+ client, target, type, reason, message, component,
eventListener, eventName);
+ return true;
+ }
+ }
+
+ public static boolean createByInterval(
+ KubernetesClient client,
+ HasMetadata target,
+ EventRecorder.Type type,
+ String reason,
+ String message,
+ EventRecorder.Component component,
+ Consumer<Event> eventListener,
+ @Nullable String messageKey,
+ Duration interval) {
+
+ String eventName =
+ generateEventName(
+ target, type, reason, messageKey != null ? messageKey
: message, component);
+ Event existing = findExistingEvent(client, target, eventName);
+
+ if (existing != null) {
+ if (Objects.equals(existing.getMessage(), message)
+ && Instant.now()
+ .isBefore(
+ Instant.parse(existing.getLastTimestamp())
+ .plusMillis(interval.toMillis())))
{
+ return false;
+ } else {
+ createUpdatedEvent(existing, client, message, eventListener);
+ return false;
+ }
+ } else {
+ createNewEvent(
+ client, target, type, reason, message, component,
eventListener, eventName);
return true;
}
}
+ private static void createUpdatedEvent(
+ Event existing,
+ KubernetesClient client,
+ String message,
+ Consumer<Event> eventListener) {
+ existing.setLastTimestamp(Instant.now().toString());
+ existing.setCount(existing.getCount() + 1);
+ existing.setMessage(message);
+ eventListener.accept(client.resource(existing).createOrReplace());
+ }
+
+ private static void createNewEvent(
+ KubernetesClient client,
+ HasMetadata target,
+ EventRecorder.Type type,
+ String reason,
+ String message,
+ EventRecorder.Component component,
+ Consumer<Event> eventListener,
+ String eventName) {
+ Event event = buildEvent(target, type, reason, message, component,
eventName);
+ eventListener.accept(client.resource(event).createOrReplace());
+ }
+
private static Event buildEvent(
HasMetadata target,
EventRecorder.Type type,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index 168ec2f5..c58b020b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -26,6 +26,7 @@ import
io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.function.Consumer;
/** Test for {@link EventUtils}. */
@@ -111,6 +112,84 @@ public class EventUtilsTest {
null));
}
+ @Test
+ public void testCreateUpdatedEvent() {
+ var consumer =
+ new Consumer<Event>() {
+ @Override
+ public void accept(Event event) {
+ eventConsumed = event;
+ }
+ };
+ var flinkApp = TestUtils.buildApplicationCluster();
+ var reason = "Cleanup";
+ var message = "message";
+ var eventName =
+ EventUtils.generateEventName(
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ message,
+ EventRecorder.Component.Operator);
+ Assertions.assertTrue(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ message,
+ EventRecorder.Component.Operator,
+ consumer,
+ null,
+ Duration.ofSeconds(1800)));
+ var event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals(eventConsumed, event);
+ Assertions.assertEquals(1, event.getCount());
+ Assertions.assertEquals(reason, event.getReason());
+
+ eventConsumed = null;
+ Assertions.assertFalse(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ message,
+ EventRecorder.Component.Operator,
+ consumer,
+ null,
+ Duration.ofSeconds(1800)));
+ event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertNull(eventConsumed);
+ Assertions.assertEquals(1, event.getCount());
+
+ Assertions.assertTrue(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ null,
+ EventRecorder.Component.Operator,
+ consumer,
+ null,
+ Duration.ofSeconds(1800)));
+ }
+
@Test
public void testCreateWithMessageKey() {
var consumer =
@@ -174,6 +253,129 @@ public class EventUtilsTest {
Assertions.assertEquals(2, event.getCount());
}
+ @Test
+ public void testCreateByIntervalWithMessageKey() {
+ var consumer =
+ new Consumer<Event>() {
+ @Override
+ public void accept(Event event) {
+ eventConsumed = event;
+ }
+ };
+ var flinkApp = TestUtils.buildApplicationCluster();
+ var reason = "Cleanup";
+ var eventName =
+ EventUtils.generateEventName(
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "mk",
+ EventRecorder.Component.Operator);
+
+ eventConsumed = null;
+ Assertions.assertTrue(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "message1",
+ EventRecorder.Component.Operator,
+ consumer,
+ "mk",
+ Duration.ofSeconds(1800)));
+ var event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals(eventConsumed, event);
+ Assertions.assertEquals("message1", event.getMessage());
+ Assertions.assertEquals(1, event.getCount());
+
+ eventConsumed = null;
+ Assertions.assertFalse(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "message2",
+ EventRecorder.Component.Operator,
+ consumer,
+ "mk",
+ Duration.ofSeconds(1800)));
+
+ event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals(eventConsumed, event);
+ Assertions.assertEquals("message2", event.getMessage());
+ Assertions.assertEquals(2, event.getCount());
+
+ eventConsumed = null;
+ Assertions.assertFalse(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "message2",
+ EventRecorder.Component.Operator,
+ consumer,
+ "mk",
+ Duration.ofSeconds(1800)));
+
+ event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertNull(eventConsumed);
+
+ eventConsumed = null;
+ Assertions.assertTrue(
+ EventUtils.createByInterval(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "message2",
+ EventRecorder.Component.Operator,
+ consumer,
+ "mk2",
+ Duration.ofSeconds(1800)));
+ eventName =
+ EventUtils.generateEventName(
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "mk2",
+ EventRecorder.Component.Operator);
+ event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals(eventConsumed, event);
+ Assertions.assertEquals("message2", event.getMessage());
+ Assertions.assertEquals(1, event.getCount());
+ }
+
@Test
public void testSameResourceNameWithDifferentUidNotShareEvents() {
var flinkApp = TestUtils.buildApplicationCluster();