This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a5aca7c1 [FLINK-31716] Improve Event UID field handling
a5aca7c1 is described below
commit a5aca7c13e7b822fc82b798888e8d6d76eb6f15a
Author: Rodrigo <[email protected]>
AuthorDate: Wed Apr 19 01:03:52 2023 -0700
[FLINK-31716] Improve Event UID field handling
---
.../flink/kubernetes/operator/utils/EventUtils.java | 8 ++------
.../controller/FlinkDeploymentControllerTest.java | 19 +++++++++++++++----
.../kubernetes/operator/utils/EventUtilsTest.java | 7 ++++---
3 files changed, 21 insertions(+), 13 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index 6769ce93..1e374127 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -76,8 +76,7 @@ public class EventUtils {
existing.setLastTimestamp(Instant.now().toString());
existing.setCount(existing.getCount() + 1);
existing.setMessage(message);
- client.resource(existing).createOrReplace();
- eventListener.accept(existing);
+ eventListener.accept(client.resource(existing).createOrReplace());
return false;
} else {
var event =
@@ -104,10 +103,7 @@ public class EventUtils {
.withNamespace(target.getMetadata().getNamespace())
.endMetadata()
.build();
-
- var ev = client.resource(event).createOrReplace();
- event.getMetadata().setUid(ev.getMetadata().getUid());
- eventListener.accept(event);
+ eventListener.accept(client.resource(event).createOrReplace());
return true;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 2103b182..541da7aa 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -41,6 +41,7 @@ import
org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
+import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
@@ -85,6 +86,15 @@ public class FlinkDeploymentControllerTest {
private KubernetesMockServer mockServer;
private KubernetesClient kubernetesClient;
+ Event mockedEvent =
+ new EventBuilder()
+ .withNewMetadata()
+ .withName("name")
+ .endMetadata()
+ .withType("type")
+ .withReason("reason")
+ .build();
+
@BeforeEach
public void setup() {
flinkService = new TestingFlinkService(kubernetesClient);
@@ -217,9 +227,10 @@ public class FlinkDeploymentControllerTest {
@Test
public void verifyFailedDeployment() throws Exception {
+
var submittedEventValidatingResponseProvider =
new TestUtils.ValidatingResponseProvider<>(
- new
EventBuilder().withNewMetadata().endMetadata().build(),
+ mockedEvent,
r ->
assertTrue(
r.getBody()
@@ -236,7 +247,7 @@ public class FlinkDeploymentControllerTest {
var validatingResponseProvider =
new TestUtils.ValidatingResponseProvider<>(
- new
EventBuilder().withNewMetadata().endMetadata().build(),
+ mockedEvent,
r ->
assertTrue(
r.getBody()
@@ -301,7 +312,7 @@ public class FlinkDeploymentControllerTest {
var submittedEventValidatingResponseProvider =
new TestUtils.ValidatingResponseProvider<>(
- new
EventBuilder().withNewMetadata().endMetadata().build(),
+ mockedEvent,
r ->
assertTrue(
r.getBody()
@@ -318,7 +329,7 @@ public class FlinkDeploymentControllerTest {
var validatingResponseProvider =
new TestUtils.ValidatingResponseProvider<>(
- new
EventBuilder().withNewMetadata().endMetadata().build(),
+ mockedEvent,
r -> {
String recordedRequestBody =
r.getBody().readUtf8();
assertTrue(recordedRequestBody.contains(reason));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index d5c7dd9d..c40f324e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -71,12 +71,12 @@ public class EventUtilsTest {
.inNamespace(flinkApp.getMetadata().getNamespace())
.withName(eventName)
.get();
- Assertions.assertEquals(event.getMetadata().getUid(),
eventConsumed.getMetadata().getUid());
- eventConsumed = null;
Assertions.assertNotNull(event);
+ Assertions.assertEquals(eventConsumed, event);
Assertions.assertEquals(1, event.getCount());
Assertions.assertEquals(reason, event.getReason());
+ eventConsumed = null;
Assertions.assertFalse(
EventUtils.createOrUpdateEvent(
kubernetesClient,
@@ -93,7 +93,8 @@ public class EventUtilsTest {
.inNamespace(flinkApp.getMetadata().getNamespace())
.withName(eventName)
.get();
- Assertions.assertEquals(event.getMetadata().getUid(),
eventConsumed.getMetadata().getUid());
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals(eventConsumed, event);
Assertions.assertEquals(2, event.getCount());
}