This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 57974b61 [FLINK-32147] Deduplicate scaling report messages
57974b61 is described below
commit 57974b61590e1976785fbdd4a40dafa975fc9521
Author: Gyula Fora <[email protected]>
AuthorDate: Fri May 19 16:42:03 2023 +0200
[FLINK-32147] Deduplicate scaling report messages
---
.../operator/autoscaler/ScalingExecutor.java | 3 +-
.../operator/autoscaler/ScalingExecutorTest.java | 8 +++
.../kubernetes/operator/utils/EventRecorder.java | 29 ++++++++-
.../kubernetes/operator/utils/EventUtils.java | 11 +++-
.../kubernetes/operator/utils/EventUtilsTest.java | 72 +++++++++++++++++++++-
5 files changed, 114 insertions(+), 9 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index bc9d33e9..15d9dcba 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -119,7 +119,8 @@ public class ScalingExecutor {
EventRecorder.Type.Normal,
EventRecorder.Reason.ScalingReport,
EventRecorder.Component.Operator,
- scalingReport);
+ scalingReport,
+ "ScalingExecutor");
if (!scalingEnabled) {
return false;
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 71103e0e..b38059a4 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -242,6 +242,14 @@ public class ScalingExecutorTest {
?
SCALING_SUMMARY_HEADER_SCALING_ENABLED
:
SCALING_SUMMARY_HEADER_SCALING_DISABLED));
assertEquals(EventRecorder.Reason.ScalingReport.name(),
event.getReason());
+
+ metrics = Map.of(jobVertexID, evaluated(1, 110, 101));
+ assertEquals(
+ scalingEnabled,
+ scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo,
conf, metrics));
+ var event2 = eventCollector.events.poll();
+ assertEquals(event.getMetadata().getUid(),
event2.getMetadata().getUid());
+ assertEquals(2, event2.getCount());
}
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index bcf73afb..28903419 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -26,6 +26,8 @@ import
org.apache.flink.kubernetes.operator.listener.AuditUtils;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.function.BiConsumer;
@@ -47,7 +49,17 @@ public class EventRecorder {
Reason reason,
Component component,
String message) {
- return triggerEvent(resource, type, reason.toString(), message,
component);
+ return triggerEvent(resource, type, reason, component, message, null);
+ }
+
+ public boolean triggerEvent(
+ AbstractFlinkResource<?, ?> resource,
+ Type type,
+ Reason reason,
+ Component component,
+ String message,
+ @Nullable String messageKey) {
+ return triggerEvent(resource, type, reason.toString(), message,
component, messageKey);
}
public boolean triggerEvent(
@@ -55,7 +67,8 @@ public class EventRecorder {
Type type,
String reason,
String message,
- Component component) {
+ Component component,
+ String messageKey) {
return EventUtils.createOrUpdateEvent(
client,
resource,
@@ -63,7 +76,17 @@ public class EventRecorder {
reason,
message,
component,
- e -> eventListener.accept(resource, e));
+ e -> eventListener.accept(resource, e),
+ messageKey);
+ }
+
+ public boolean triggerEvent(
+ AbstractFlinkResource<?, ?> resource,
+ Type type,
+ String reason,
+ String message,
+ Component component) {
+ return triggerEvent(resource, type, reason, message, component, null);
}
public static EventRecorder create(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index 1e374127..5ed40ed3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -23,6 +23,8 @@ import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
+import javax.annotation.Nullable;
+
import java.time.Instant;
import java.util.function.Consumer;
@@ -57,8 +59,13 @@ public class EventUtils {
String reason,
String message,
EventRecorder.Component component,
- Consumer<Event> eventListener) {
- var eventName = generateEventName(target, type, reason, message,
component);
+ Consumer<Event> eventListener,
+ @Nullable String messageKey) {
+
+ if (messageKey == null) {
+ messageKey = message;
+ }
+ var eventName = generateEventName(target, type, reason, messageKey,
component);
var existing =
client.v1()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index 60fa8470..7ee52cdc 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -63,7 +63,8 @@ public class EventUtilsTest {
reason,
message,
EventRecorder.Component.Operator,
- consumer));
+ consumer,
+ null));
var event =
kubernetesClient
.v1()
@@ -85,7 +86,8 @@ public class EventUtilsTest {
reason,
message,
EventRecorder.Component.Operator,
- consumer));
+ consumer,
+ null));
event =
kubernetesClient
.v1()
@@ -105,7 +107,71 @@ public class EventUtilsTest {
reason,
null,
EventRecorder.Component.Operator,
- consumer));
+ consumer,
+ null));
+ }
+
+ @Test
+ public void testCreateWithMessageKey() {
+ var consumer =
+ new Consumer<Event>() {
+ @Override
+ public void accept(Event event) {
+ eventConsumed = event;
+ }
+ };
+ var flinkApp = TestUtils.buildApplicationCluster();
+ var reason = "Cleanup";
+ var eventName =
+ EventUtils.generateEventName(
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "mk",
+ EventRecorder.Component.Operator);
+
+ Assertions.assertTrue(
+ EventUtils.createOrUpdateEvent(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "message1",
+ EventRecorder.Component.Operator,
+ consumer,
+ "mk"));
+ var event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals("message1", event.getMessage());
+ Assertions.assertEquals(1, event.getCount());
+
+ Assertions.assertFalse(
+ EventUtils.createOrUpdateEvent(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Warning,
+ reason,
+ "message2",
+ EventRecorder.Component.Operator,
+ consumer,
+ "mk"));
+
+ event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals("message2", event.getMessage());
+ Assertions.assertEquals(2, event.getCount());
}
@Test