This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a909b52  [FLINK-26135] Introduce ReconciliationStatus and improve 
error handling in controller flow
a909b52 is described below

commit a909b5227b9131b4822ef281be1e282c4f8b700f
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Feb 16 16:28:14 2022 +0100

    [FLINK-26135] Introduce ReconciliationStatus and improve error handling in 
controller flow
    
    Closes #5
---
 .../controller/FlinkDeploymentController.java      |  82 +++++++++++----
 .../operator/crd/status/FlinkDeploymentStatus.java |   4 +-
 ...oymentStatus.java => ReconciliationStatus.java} |  11 +-
 .../InvalidDeploymentException.java}               |  21 ++--
 .../operator/observer/JobStatusObserver.java       |   8 +-
 .../operator/reconciler/JobReconciler.java         |  45 ++++----
 .../operator/reconciler/SessionReconciler.java     |  32 +++---
 .../flink/kubernetes/operator/TestUtils.java       |   2 +
 .../controller/FlinkDeploymentControllerTest.java  | 113 +++++++++++++++++++++
 .../operator/observer/JobStatusObserverTest.java   |  11 +-
 .../operator/reconciler/JobReconcilerTest.java     |   5 +-
 11 files changed, 248 insertions(+), 86 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 09eb536..dbafd7d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,6 +19,8 @@ package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -51,7 +53,9 @@ public class FlinkDeploymentController
                 ErrorStatusHandler<FlinkDeployment>,
                 EventSourceInitializer<FlinkDeployment> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDeploymentController.class);
-    private static final int JOB_REFRESH_SECONDS = 5;
+
+    public static final int OBSERVE_REFRESH_SECONDS = 10;
+    public static final int RECONCILE_ERROR_REFRESH_SECONDS = 5;
 
     private final KubernetesClient kubernetesClient;
 
@@ -90,37 +94,75 @@ public class FlinkDeploymentController
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, 
Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
+        if (flinkApp.getStatus() == null) {
+            flinkApp.setStatus(new FlinkDeploymentStatus());
+        }
 
         Configuration effectiveConfig = 
FlinkUtils.getEffectiveConfig(flinkApp);
 
-        boolean success = observer.observeFlinkJobStatus(flinkApp, 
effectiveConfig);
-        if (success) {
-            try {
-                success = reconcileFlinkDeployment(operatorNamespace, 
flinkApp, effectiveConfig);
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Error while reconciling deployment change for "
-                                + flinkApp.getMetadata().getName(),
-                        e);
-            }
-        }
+        boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, 
effectiveConfig);
 
-        if (!success) {
+        if (!successfulObserve) {
+            // Cluster not accessible let's retry
             return UpdateControl.<FlinkDeployment>noUpdate()
-                    .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+                    .rescheduleAfter(OBSERVE_REFRESH_SECONDS, 
TimeUnit.SECONDS);
+        }
+
+        if (!specChanged(flinkApp)) {
+            // Successfully observed the cluster after reconciliation, no need 
to reschedule
+            return UpdateControl.updateStatus(flinkApp);
+        }
+
+        try {
+            reconcileFlinkDeployment(operatorNamespace, flinkApp, 
effectiveConfig);
+        } catch (Exception e) {
+            String err = "Error while reconciling deployment change: " + 
e.getMessage();
+            String lastErr = 
flinkApp.getStatus().getReconciliationStatus().getError();
+            if (!err.equals(lastErr)) {
+                // Log new errors on the first instance
+                LOG.error("Error while reconciling deployment change", e);
+                updateForReconciliationError(flinkApp, err);
+                return UpdateControl.updateStatus(flinkApp)
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, 
TimeUnit.SECONDS);
+            } else {
+                return UpdateControl.<FlinkDeployment>noUpdate()
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, 
TimeUnit.SECONDS);
+            }
         }
 
-        flinkApp.getStatus().setSpec(flinkApp.getSpec());
+        // Everything went well, update status and reschedule for observation
+        updateForReconciliationSuccess(flinkApp);
         return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+                .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
     }
 
-    private boolean reconcileFlinkDeployment(
+    private boolean specChanged(FlinkDeployment flinkApp) {
+        return !flinkApp.getSpec()
+                
.equals(flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+    }
+
+    private void reconcileFlinkDeployment(
             String operatorNamespace, FlinkDeployment flinkApp, Configuration 
effectiveConfig)
             throws Exception {
-        return flinkApp.getSpec().getJob() == null
-                ? sessionReconciler.reconcile(operatorNamespace, flinkApp, 
effectiveConfig)
-                : jobReconciler.reconcile(operatorNamespace, flinkApp, 
effectiveConfig);
+
+        if (flinkApp.getSpec().getJob() == null) {
+            sessionReconciler.reconcile(operatorNamespace, flinkApp, 
effectiveConfig);
+        } else {
+            jobReconciler.reconcile(operatorNamespace, flinkApp, 
effectiveConfig);
+        }
+    }
+
+    private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
+        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(true);
+        reconciliationStatus.setError(null);
+        reconciliationStatus.setLastReconciledSpec(flinkApp.getSpec());
+    }
+
+    private void updateForReconciliationError(FlinkDeployment flinkApp, String 
err) {
+        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(false);
+        reconciliationStatus.setError(err);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index 041bd0a..928c827 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.crd.status;
 
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -29,5 +27,5 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 public class FlinkDeploymentStatus {
     private JobStatus jobStatus;
-    private FlinkDeploymentSpec spec;
+    private ReconciliationStatus reconciliationStatus = new 
ReconciliationStatus();
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
similarity index 82%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index 041bd0a..22dc565 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -20,14 +20,17 @@ package org.apache.flink.kubernetes.operator.crd.status;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-/** Current status of the Flink deployment. */
+/** Status of the Flink deployment reconciliation flow. */
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-public class FlinkDeploymentStatus {
-    private JobStatus jobStatus;
-    private FlinkDeploymentSpec spec;
+@Builder
+public class ReconciliationStatus {
+    private boolean success;
+    private String error;
+    private FlinkDeploymentSpec lastReconciledSpec;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
similarity index 65%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
index 041bd0a..9ce408e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
@@ -15,19 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.crd.status;
+package org.apache.flink.kubernetes.operator.exception;
 
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+/** Exception for encountering invalid FlinkDeployment resources. */
+public class InvalidDeploymentException extends Exception {
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+    public InvalidDeploymentException(String msg) {
+        super(msg);
+    }
 
-/** Current status of the Flink deployment. */
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class FlinkDeploymentStatus {
-    private JobStatus jobStatus;
-    private FlinkDeploymentSpec spec;
+    public InvalidDeploymentException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 115d7f5..6297365 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -46,12 +47,15 @@ public class JobStatusObserver {
     }
 
     public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig) {
-        if (flinkApp.getStatus() == null) {
+        FlinkDeploymentSpec lastReconciledSpec =
+                
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
             // This is the first run, nothing to observe
             return true;
         }
 
-        JobSpec jobSpec = flinkApp.getStatus().getSpec().getJob();
+        JobSpec jobSpec = lastReconciledSpec.getJob();
 
         if (jobSpec == null) {
             // This is a session cluster, nothing to observe
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 564efc9..882c58a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -20,11 +20,12 @@ package org.apache.flink.kubernetes.operator.reconciler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -51,36 +52,32 @@ public class JobReconciler {
         this.flinkService = flinkService;
     }
 
-    public boolean reconcile(
+    public void reconcile(
             String operatorNamespace, FlinkDeployment flinkApp, Configuration 
effectiveConfig)
             throws Exception {
 
+        FlinkDeploymentSpec lastReconciledSpec =
+                
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
         JobSpec jobSpec = flinkApp.getSpec().getJob();
-        if (flinkApp.getStatus() == null) {
-            flinkApp.setStatus(new FlinkDeploymentStatus());
+        if (lastReconciledSpec == null) {
             if (!jobSpec.getState().equals(JobState.RUNNING)) {
-                throw new RuntimeException("Job must start in running state");
-            }
-            try {
-                deployFlinkJob(
-                        flinkApp,
-                        effectiveConfig,
-                        
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
-                IngressUtils.updateIngressRules(
-                        flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
-                return true;
-            } catch (Exception e) {
-                LOG.error("Error while deploying " + 
flinkApp.getMetadata().getName(), e);
-                return false;
+                throw new InvalidDeploymentException("Job must start in 
running state");
             }
+            deployFlinkJob(
+                    flinkApp,
+                    effectiveConfig,
+                    Optional.ofNullable(jobSpec.getInitialSavepointPath()));
+            IngressUtils.updateIngressRules(
+                    flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
+            return;
         }
 
-        boolean specChanged = 
!flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+        boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
         if (specChanged) {
-            if (flinkApp.getStatus().getSpec().getJob() == null) {
-                throw new RuntimeException("Cannot switch from session to job 
cluster");
+            if (lastReconciledSpec.getJob() == null) {
+                throw new InvalidDeploymentException("Cannot switch from 
session to job cluster");
             }
-            JobState currentJobState = 
flinkApp.getStatus().getSpec().getJob().getState();
+            JobState currentJobState = lastReconciledSpec.getJob().getState();
             JobState desiredJobState = jobSpec.getState();
 
             UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
@@ -103,13 +100,12 @@ public class JobReconciler {
                     } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
                         restoreFromLastSavepoint(flinkApp, effectiveConfig);
                     } else {
-                        throw new UnsupportedOperationException(
+                        throw new InvalidDeploymentException(
                                 "Only savepoint and stateless strategies are 
supported at the moment.");
                     }
                 }
             }
         }
-        return true;
     }
 
     private void deployFlinkJob(
@@ -136,7 +132,8 @@ public class JobReconciler {
 
         String savepointLocation = jobStatus.getSavepointLocation();
         if (savepointLocation == null) {
-            throw new RuntimeException("Cannot perform stateful restore 
without a valid savepoint");
+            throw new InvalidDeploymentException(
+                    "Cannot perform stateful restore without a valid 
savepoint");
         }
         deployFlinkJob(flinkApp, effectiveConfig, 
Optional.of(savepointLocation));
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 0c7bae5..2e36fc5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -19,7 +19,8 @@ package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
@@ -43,31 +44,28 @@ public class SessionReconciler {
         this.flinkService = flinkService;
     }
 
-    public boolean reconcile(
+    public void reconcile(
             String operatorNamespace, FlinkDeployment flinkApp, Configuration 
effectiveConfig)
             throws Exception {
-        if (flinkApp.getStatus() == null) {
-            flinkApp.setStatus(new FlinkDeploymentStatus());
-            try {
-                flinkService.submitSessionCluster(flinkApp, effectiveConfig);
-                IngressUtils.updateIngressRules(
-                        flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
-                return true;
-            } catch (Exception e) {
-                LOG.error("Error while deploying " + 
flinkApp.getMetadata().getName(), e);
-                return false;
-            }
+
+        FlinkDeploymentSpec lastReconciledSpec =
+                
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            flinkService.submitSessionCluster(flinkApp, effectiveConfig);
+            IngressUtils.updateIngressRules(
+                    flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
+            return;
         }
 
-        boolean specChanged = 
!flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+        boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
 
         if (specChanged) {
-            if (flinkApp.getStatus().getSpec().getJob() != null) {
-                throw new RuntimeException("Cannot switch from job to session 
cluster");
+            if (lastReconciledSpec.getJob() != null) {
+                throw new InvalidDeploymentException("Cannot switch from job 
to session cluster");
             }
             upgradeSessionCluster(flinkApp, effectiveConfig);
         }
-        return true;
     }
 
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration 
effectiveConfig)
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 4ef6860..6f27b9b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,6 +45,7 @@ public class TestUtils {
 
     public static FlinkDeployment buildSessionCluster() {
         FlinkDeployment deployment = new FlinkDeployment();
+        deployment.setStatus(new FlinkDeploymentStatus());
         deployment.setMetadata(
                 new ObjectMetaBuilder()
                         .withName("test-cluster")
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
new file mode 100644
index 0000000..3b4ae63
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** @link JobStatusObserver unit tests */
+public class FlinkDeploymentControllerTest {
+
+    private final TestingFlinkService flinkService = new TestingFlinkService();
+
+    @Test
+    public void verifyBasicReconcileLoop() {
+        FlinkDeploymentController testController = createTestController();
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+
+        UpdateControl<FlinkDeployment> updateControl;
+
+        updateControl = testController.reconcile(appCluster, null);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                FlinkDeploymentController.OBSERVE_REFRESH_SECONDS * 1000,
+                (long) updateControl.getScheduleDelay().get());
+
+        // Validate reconciliation status
+        ReconciliationStatus reconciliationStatus =
+                appCluster.getStatus().getReconciliationStatus();
+        assertTrue(reconciliationStatus.isSuccess());
+        assertNull(reconciliationStatus.getError());
+        assertEquals(appCluster.getSpec(), 
reconciliationStatus.getLastReconciledSpec());
+
+        updateControl = testController.reconcile(appCluster, null);
+        assertTrue(updateControl.isUpdateStatus());
+        assertFalse(updateControl.getScheduleDelay().isPresent());
+
+        // Validate job status
+        JobStatus jobStatus = appCluster.getStatus().getJobStatus();
+        JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
+        assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
+        assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+
+        // Send in invalid update
+        appCluster = TestUtils.clone(appCluster);
+        appCluster.getSpec().setJob(null);
+        updateControl = testController.reconcile(appCluster, null);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS * 
1000,
+                (long) updateControl.getScheduleDelay().get());
+
+        reconciliationStatus = 
appCluster.getStatus().getReconciliationStatus();
+        assertFalse(reconciliationStatus.isSuccess());
+        assertEquals(
+                "Error while reconciling deployment change: Cannot switch from 
job to session cluster",
+                reconciliationStatus.getError());
+        assertNotNull(reconciliationStatus.getLastReconciledSpec().getJob());
+
+        updateControl = testController.reconcile(appCluster, null);
+        assertTrue(updateControl.isNoUpdate());
+        assertEquals(
+                FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS * 
1000,
+                (long) updateControl.getScheduleDelay().get());
+
+        // Validate job status correct even with error
+        jobStatus = appCluster.getStatus().getJobStatus();
+        expectedJobStatus = flinkService.listJobs().get(0).f1;
+        assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
+        assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+    }
+
+    private FlinkDeploymentController createTestController() {
+        JobStatusObserver observer = new JobStatusObserver(flinkService);
+        JobReconciler jobReconciler = new JobReconciler(null, flinkService);
+        SessionReconciler sessionReconciler = new SessionReconciler(null, 
flinkService);
+
+        return new FlinkDeploymentController(
+                null, "test", observer, jobReconciler, sessionReconciler);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index e2a1dfd..48f7b9e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -40,8 +40,10 @@ public class JobStatusObserverTest {
         FlinkService flinkService = new TestingFlinkService();
         JobStatusObserver observer = new JobStatusObserver(flinkService);
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
-        deployment.setStatus(new FlinkDeploymentStatus());
-        deployment.getStatus().setSpec(deployment.getSpec());
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
         assertTrue(
                 observer.observeFlinkJobStatus(
                         deployment, 
FlinkUtils.getEffectiveConfig(deployment)));
@@ -56,7 +58,10 @@ public class JobStatusObserverTest {
 
         assertTrue(observer.observeFlinkJobStatus(deployment, conf));
         deployment.setStatus(new FlinkDeploymentStatus());
-        deployment.getStatus().setSpec(deployment.getSpec());
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
 
         assertFalse(observer.observeFlinkJobStatus(deployment, conf));
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 2e2bf78..5d211a4 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -49,7 +49,10 @@ public class JobReconcilerTest {
         List<Tuple2<String, JobStatusMessage>> runningJobs = 
flinkService.listJobs();
         assertEquals(1, runningJobs.size());
         assertNull(runningJobs.get(0).f0);
-        deployment.getStatus().setSpec(deployment.getSpec());
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
 
         JobStatus jobStatus = new JobStatus();
         jobStatus.setJobName(runningJobs.get(0).f1.getJobName());

Reply via email to