This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c06094c  [FLINK-28534] Spec change event is triggered twice per upgrade
c06094c is described below

commit c06094c09a37ba3033922c073eb8d08d5a66ac87
Author: Matyas Orhidi <[email protected]>
AuthorDate: Wed Jul 13 12:44:42 2022 +0200

    [FLINK-28534] Spec change event is triggered twice per upgrade
---
 .../AbstractFlinkResourceReconciler.java           | 14 ++--
 .../controller/FlinkDeploymentControllerTest.java  | 84 +++++++++++++++-------
 .../TestingFlinkDeploymentController.java          | 20 +++++-
 3 files changed, 86 insertions(+), 32 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 903b4e1..b0d8abe 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -134,12 +134,14 @@ public abstract class AbstractFlinkResourceReconciler<
                 return;
             }
             LOG.info(MSG_SPEC_CHANGED);
-            eventRecorder.triggerEvent(
-                    cr,
-                    EventRecorder.Type.Normal,
-                    EventRecorder.Reason.SpecChanged,
-                    EventRecorder.Component.JobManagerDeployment,
-                    MSG_SPEC_CHANGED);
+            if (reconciliationStatus.getState() != 
ReconciliationState.UPGRADING) {
+                eventRecorder.triggerEvent(
+                        cr,
+                        EventRecorder.Type.Normal,
+                        EventRecorder.Reason.SpecChanged,
+                        EventRecorder.Component.JobManagerDeployment,
+                        MSG_SPEC_CHANGED);
+            }
             reconcileSpecChange(cr, observeConfig, deployConfig);
         } else if (shouldRollBack(cr, observeConfig)) {
             // Rollbacks are executed in two steps, we initiate it first then 
return
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 54e0ace..866ecbd 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -24,14 +24,12 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
@@ -39,6 +37,7 @@ import 
org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -62,7 +61,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
@@ -91,7 +89,6 @@ public class FlinkDeploymentControllerTest {
     public void setup() {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
-
         testController =
                 new TestingFlinkDeploymentController(configManager, 
kubernetesClient, flinkService);
         
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
@@ -125,7 +122,6 @@ public class FlinkDeploymentControllerTest {
                                 .getProgressCheckInterval()
                                 .toMillis()),
                 updateControl.getScheduleDelay());
-
         // Validate reconciliation status
         ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
                 appCluster.getStatus().getReconciliationStatus();
@@ -376,7 +372,6 @@ public class FlinkDeploymentControllerTest {
                 .put(
                         CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
                         "file:///flink-data/savepoints");
-
         testController.reconcile(appCluster, context);
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
@@ -461,6 +456,7 @@ public class FlinkDeploymentControllerTest {
     @ParameterizedTest
     @EnumSource(FlinkVersion.class)
     public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws 
Exception {
+        testController.events().clear();
         FlinkDeployment appCluster = 
TestUtils.buildApplicationCluster(flinkVersion);
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
@@ -469,15 +465,32 @@ public class FlinkDeploymentControllerTest {
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
-
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         testController.reconcile(appCluster, context);
         testController.reconcile(appCluster, context);
 
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.StatusChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
         // Upgrade job
         appCluster.getSpec().getJob().setParallelism(100);
 
         UpdateControl<FlinkDeployment> updateControl =
                 testController.reconcile(appCluster, context);
+
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Suspended,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
         assertEquals(0, updateControl.getScheduleDelay().get());
         assertEquals(
                 JobState.SUSPENDED,
@@ -489,6 +502,11 @@ public class FlinkDeploymentControllerTest {
                         .getState());
 
         updateControl = testController.reconcile(appCluster, context);
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING
                         .rescheduleAfter(appCluster, 
configManager.getOperatorConfiguration())
@@ -502,14 +520,33 @@ public class FlinkDeploymentControllerTest {
 
         testController.reconcile(appCluster, context);
         testController.reconcile(appCluster, context);
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.StatusChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
 
         // Suspend job
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
         testController.reconcile(appCluster, context);
 
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Suspended,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
         // Resume from empty state
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
         testController.reconcile(appCluster, context);
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals(null, jobs.get(0).f0);
@@ -517,6 +554,13 @@ public class FlinkDeploymentControllerTest {
         // Inject validation error in the middle of the upgrade
         appCluster.getSpec().setRestartNonce(123L);
         testController.reconcile(appCluster, context);
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Suspended,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         assertEquals(
                 JobState.SUSPENDED,
                 appCluster
@@ -527,8 +571,16 @@ public class FlinkDeploymentControllerTest {
                         .getState());
         appCluster.getSpec().setLogConfiguration(Map.of("invalid", "conf"));
         testController.reconcile(appCluster, TestUtils.createEmptyContext());
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         testController.reconcile(appCluster, context);
         testController.reconcile(appCluster, context);
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.StatusChanged,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
 
         assertEquals(
                 JobManagerDeploymentStatus.READY,
@@ -890,22 +942,4 @@ public class FlinkDeploymentControllerTest {
         }
         return args.stream();
     }
-
-    private static class StatusUpdateCounter
-            implements BiConsumer<
-                    AbstractFlinkResource<?, FlinkDeploymentStatus>, 
FlinkDeploymentStatus> {
-        private int counter;
-
-        @Override
-        public void accept(
-                AbstractFlinkResource<?, FlinkDeploymentStatus>
-                        flinkDeploymentStatusAbstractFlinkResource,
-                FlinkDeploymentStatus flinkDeploymentStatus) {
-            counter++;
-        }
-
-        public int getCount() {
-            return counter;
-        }
-    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 68a29f7..2def2b4 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 
+import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -43,7 +44,9 @@ import 
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import org.junit.jupiter.api.Assertions;
 
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Queue;
 import java.util.function.BiConsumer;
 
 /** A wrapper around {@link FlinkDeploymentController} used by unit tests. */
@@ -55,6 +58,8 @@ public class TestingFlinkDeploymentController
 
     private FlinkDeploymentController flinkDeploymentController;
     private StatusUpdateCounter statusUpdateCounter = new 
StatusUpdateCounter();
+    private EventCollector eventCollector = new EventCollector();
+
     private EventRecorder eventRecorder;
     private StatusRecorder statusRecorder;
 
@@ -62,7 +67,7 @@ public class TestingFlinkDeploymentController
             FlinkConfigManager configManager,
             KubernetesClient kubernetesClient,
             TestingFlinkService flinkService) {
-        eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
         statusRecorder =
                 new StatusRecorder<>(
                         kubernetesClient,
@@ -117,6 +122,19 @@ public class TestingFlinkDeploymentController
         throw new UnsupportedOperationException();
     }
 
+    private static class EventCollector implements 
BiConsumer<AbstractFlinkResource<?, ?>, Event> {
+        private Queue<Event> events = new LinkedList<>();
+
+        @Override
+        public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, 
Event event) {
+            events.add(event);
+        }
+    }
+
+    public Queue<Event> events() {
+        return eventCollector.events;
+    }
+
     private static class StatusUpdateCounter
             implements BiConsumer<
                     AbstractFlinkResource<?, FlinkDeploymentStatus>, 
FlinkDeploymentStatus> {

Reply via email to