This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a909b52 [FLINK-26135] Introduce ReconciliationStatus and improve
error handling in controller flow
a909b52 is described below
commit a909b5227b9131b4822ef281be1e282c4f8b700f
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Feb 16 16:28:14 2022 +0100
[FLINK-26135] Introduce ReconciliationStatus and improve error handling in
controller flow
Closes #5
---
.../controller/FlinkDeploymentController.java | 82 +++++++++++----
.../operator/crd/status/FlinkDeploymentStatus.java | 4 +-
...oymentStatus.java => ReconciliationStatus.java} | 11 +-
.../InvalidDeploymentException.java} | 21 ++--
.../operator/observer/JobStatusObserver.java | 8 +-
.../operator/reconciler/JobReconciler.java | 45 ++++----
.../operator/reconciler/SessionReconciler.java | 32 +++---
.../flink/kubernetes/operator/TestUtils.java | 2 +
.../controller/FlinkDeploymentControllerTest.java | 113 +++++++++++++++++++++
.../operator/observer/JobStatusObserverTest.java | 11 +-
.../operator/reconciler/JobReconcilerTest.java | 5 +-
11 files changed, 248 insertions(+), 86 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 09eb536..dbafd7d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,6 +19,8 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -51,7 +53,9 @@ public class FlinkDeploymentController
ErrorStatusHandler<FlinkDeployment>,
EventSourceInitializer<FlinkDeployment> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkDeploymentController.class);
- private static final int JOB_REFRESH_SECONDS = 5;
+
+ public static final int OBSERVE_REFRESH_SECONDS = 10;
+ public static final int RECONCILE_ERROR_REFRESH_SECONDS = 5;
private final KubernetesClient kubernetesClient;
@@ -90,37 +94,75 @@ public class FlinkDeploymentController
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp,
Context context) {
LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
+ if (flinkApp.getStatus() == null) {
+ flinkApp.setStatus(new FlinkDeploymentStatus());
+ }
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp);
- boolean success = observer.observeFlinkJobStatus(flinkApp,
effectiveConfig);
- if (success) {
- try {
- success = reconcileFlinkDeployment(operatorNamespace,
flinkApp, effectiveConfig);
- } catch (Exception e) {
- throw new RuntimeException(
- "Error while reconciling deployment change for "
- + flinkApp.getMetadata().getName(),
- e);
- }
- }
+ boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp,
effectiveConfig);
- if (!success) {
+ if (!successfulObserve) {
+ // Cluster not accessible let's retry
return UpdateControl.<FlinkDeployment>noUpdate()
- .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+ .rescheduleAfter(OBSERVE_REFRESH_SECONDS,
TimeUnit.SECONDS);
+ }
+
+ if (!specChanged(flinkApp)) {
+ // Successfully observed the cluster after reconciliation, no need
to reschedule
+ return UpdateControl.updateStatus(flinkApp);
+ }
+
+ try {
+ reconcileFlinkDeployment(operatorNamespace, flinkApp,
effectiveConfig);
+ } catch (Exception e) {
+ String err = "Error while reconciling deployment change: " +
e.getMessage();
+ String lastErr =
flinkApp.getStatus().getReconciliationStatus().getError();
+ if (!err.equals(lastErr)) {
+ // Log new errors on the first instance
+ LOG.error("Error while reconciling deployment change", e);
+ updateForReconciliationError(flinkApp, err);
+ return UpdateControl.updateStatus(flinkApp)
+ .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS,
TimeUnit.SECONDS);
+ } else {
+ return UpdateControl.<FlinkDeployment>noUpdate()
+ .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS,
TimeUnit.SECONDS);
+ }
}
- flinkApp.getStatus().setSpec(flinkApp.getSpec());
+ // Everything went well, update status and reschedule for observation
+ updateForReconciliationSuccess(flinkApp);
return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+ .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
}
- private boolean reconcileFlinkDeployment(
+ private boolean specChanged(FlinkDeployment flinkApp) {
+ return !flinkApp.getSpec()
+
.equals(flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+ }
+
+ private void reconcileFlinkDeployment(
String operatorNamespace, FlinkDeployment flinkApp, Configuration
effectiveConfig)
throws Exception {
- return flinkApp.getSpec().getJob() == null
- ? sessionReconciler.reconcile(operatorNamespace, flinkApp,
effectiveConfig)
- : jobReconciler.reconcile(operatorNamespace, flinkApp,
effectiveConfig);
+
+ if (flinkApp.getSpec().getJob() == null) {
+ sessionReconciler.reconcile(operatorNamespace, flinkApp,
effectiveConfig);
+ } else {
+ jobReconciler.reconcile(operatorNamespace, flinkApp,
effectiveConfig);
+ }
+ }
+
+ private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
+ ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(true);
+ reconciliationStatus.setError(null);
+ reconciliationStatus.setLastReconciledSpec(flinkApp.getSpec());
+ }
+
+ private void updateForReconciliationError(FlinkDeployment flinkApp, String
err) {
+ ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(false);
+ reconciliationStatus.setError(err);
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index 041bd0a..928c827 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -17,8 +17,6 @@
package org.apache.flink.kubernetes.operator.crd.status;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -29,5 +27,5 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class FlinkDeploymentStatus {
private JobStatus jobStatus;
- private FlinkDeploymentSpec spec;
+ private ReconciliationStatus reconciliationStatus = new
ReconciliationStatus();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
similarity index 82%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index 041bd0a..22dc565 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -20,14 +20,17 @@ package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-/** Current status of the Flink deployment. */
+/** Status of the Flink deployment reconciliation flow. */
@Data
@NoArgsConstructor
@AllArgsConstructor
-public class FlinkDeploymentStatus {
- private JobStatus jobStatus;
- private FlinkDeploymentSpec spec;
+@Builder
+public class ReconciliationStatus {
+ private boolean success;
+ private String error;
+ private FlinkDeploymentSpec lastReconciledSpec;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
similarity index 65%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
index 041bd0a..9ce408e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
@@ -15,19 +15,16 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.crd.status;
+package org.apache.flink.kubernetes.operator.exception;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+/** Exception for encountering invalid FlinkDeployment resources. */
+public class InvalidDeploymentException extends Exception {
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+ public InvalidDeploymentException(String msg) {
+ super(msg);
+ }
-/** Current status of the Flink deployment. */
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class FlinkDeploymentStatus {
- private JobStatus jobStatus;
- private FlinkDeploymentSpec spec;
+ public InvalidDeploymentException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 115d7f5..6297365 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -46,12 +47,15 @@ public class JobStatusObserver {
}
public boolean observeFlinkJobStatus(FlinkDeployment flinkApp,
Configuration effectiveConfig) {
- if (flinkApp.getStatus() == null) {
+ FlinkDeploymentSpec lastReconciledSpec =
+
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+ if (lastReconciledSpec == null) {
// This is the first run, nothing to observe
return true;
}
- JobSpec jobSpec = flinkApp.getStatus().getSpec().getJob();
+ JobSpec jobSpec = lastReconciledSpec.getJob();
if (jobSpec == null) {
// This is a session cluster, nothing to observe
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 564efc9..882c58a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -20,11 +20,12 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -51,36 +52,32 @@ public class JobReconciler {
this.flinkService = flinkService;
}
- public boolean reconcile(
+ public void reconcile(
String operatorNamespace, FlinkDeployment flinkApp, Configuration
effectiveConfig)
throws Exception {
+ FlinkDeploymentSpec lastReconciledSpec =
+
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
JobSpec jobSpec = flinkApp.getSpec().getJob();
- if (flinkApp.getStatus() == null) {
- flinkApp.setStatus(new FlinkDeploymentStatus());
+ if (lastReconciledSpec == null) {
if (!jobSpec.getState().equals(JobState.RUNNING)) {
- throw new RuntimeException("Job must start in running state");
- }
- try {
- deployFlinkJob(
- flinkApp,
- effectiveConfig,
-
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
- IngressUtils.updateIngressRules(
- flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
- return true;
- } catch (Exception e) {
- LOG.error("Error while deploying " +
flinkApp.getMetadata().getName(), e);
- return false;
+ throw new InvalidDeploymentException("Job must start in
running state");
}
+ deployFlinkJob(
+ flinkApp,
+ effectiveConfig,
+ Optional.ofNullable(jobSpec.getInitialSavepointPath()));
+ IngressUtils.updateIngressRules(
+ flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
+ return;
}
- boolean specChanged =
!flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+ boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
- if (flinkApp.getStatus().getSpec().getJob() == null) {
- throw new RuntimeException("Cannot switch from session to job
cluster");
+ if (lastReconciledSpec.getJob() == null) {
+ throw new InvalidDeploymentException("Cannot switch from
session to job cluster");
}
- JobState currentJobState =
flinkApp.getStatus().getSpec().getJob().getState();
+ JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = jobSpec.getState();
UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
@@ -103,13 +100,12 @@ public class JobReconciler {
} else if (upgradeMode == UpgradeMode.SAVEPOINT) {
restoreFromLastSavepoint(flinkApp, effectiveConfig);
} else {
- throw new UnsupportedOperationException(
+ throw new InvalidDeploymentException(
"Only savepoint and stateless strategies are
supported at the moment.");
}
}
}
}
- return true;
}
private void deployFlinkJob(
@@ -136,7 +132,8 @@ public class JobReconciler {
String savepointLocation = jobStatus.getSavepointLocation();
if (savepointLocation == null) {
- throw new RuntimeException("Cannot perform stateful restore
without a valid savepoint");
+ throw new InvalidDeploymentException(
+ "Cannot perform stateful restore without a valid
savepoint");
}
deployFlinkJob(flinkApp, effectiveConfig,
Optional.of(savepointLocation));
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 0c7bae5..2e36fc5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -19,7 +19,8 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -43,31 +44,28 @@ public class SessionReconciler {
this.flinkService = flinkService;
}
- public boolean reconcile(
+ public void reconcile(
String operatorNamespace, FlinkDeployment flinkApp, Configuration
effectiveConfig)
throws Exception {
- if (flinkApp.getStatus() == null) {
- flinkApp.setStatus(new FlinkDeploymentStatus());
- try {
- flinkService.submitSessionCluster(flinkApp, effectiveConfig);
- IngressUtils.updateIngressRules(
- flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
- return true;
- } catch (Exception e) {
- LOG.error("Error while deploying " +
flinkApp.getMetadata().getName(), e);
- return false;
- }
+
+ FlinkDeploymentSpec lastReconciledSpec =
+
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+ if (lastReconciledSpec == null) {
+ flinkService.submitSessionCluster(flinkApp, effectiveConfig);
+ IngressUtils.updateIngressRules(
+ flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
+ return;
}
- boolean specChanged =
!flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+ boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
- if (flinkApp.getStatus().getSpec().getJob() != null) {
- throw new RuntimeException("Cannot switch from job to session
cluster");
+ if (lastReconciledSpec.getJob() != null) {
+ throw new InvalidDeploymentException("Cannot switch from job
to session cluster");
}
upgradeSessionCluster(flinkApp, effectiveConfig);
}
- return true;
}
private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration
effectiveConfig)
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 4ef6860..6f27b9b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,6 +45,7 @@ public class TestUtils {
public static FlinkDeployment buildSessionCluster() {
FlinkDeployment deployment = new FlinkDeployment();
+ deployment.setStatus(new FlinkDeploymentStatus());
deployment.setMetadata(
new ObjectMetaBuilder()
.withName("test-cluster")
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
new file mode 100644
index 0000000..3b4ae63
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** @link JobStatusObserver unit tests */
+public class FlinkDeploymentControllerTest {
+
+ private final TestingFlinkService flinkService = new TestingFlinkService();
+
+ @Test
+ public void verifyBasicReconcileLoop() {
+ FlinkDeploymentController testController = createTestController();
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+
+ UpdateControl<FlinkDeployment> updateControl;
+
+ updateControl = testController.reconcile(appCluster, null);
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ FlinkDeploymentController.OBSERVE_REFRESH_SECONDS * 1000,
+ (long) updateControl.getScheduleDelay().get());
+
+ // Validate reconciliation status
+ ReconciliationStatus reconciliationStatus =
+ appCluster.getStatus().getReconciliationStatus();
+ assertTrue(reconciliationStatus.isSuccess());
+ assertNull(reconciliationStatus.getError());
+ assertEquals(appCluster.getSpec(),
reconciliationStatus.getLastReconciledSpec());
+
+ updateControl = testController.reconcile(appCluster, null);
+ assertTrue(updateControl.isUpdateStatus());
+ assertFalse(updateControl.getScheduleDelay().isPresent());
+
+ // Validate job status
+ JobStatus jobStatus = appCluster.getStatus().getJobStatus();
+ JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
+ assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
+ assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+ assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+
+ // Send in invalid update
+ appCluster = TestUtils.clone(appCluster);
+ appCluster.getSpec().setJob(null);
+ updateControl = testController.reconcile(appCluster, null);
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS *
1000,
+ (long) updateControl.getScheduleDelay().get());
+
+ reconciliationStatus =
appCluster.getStatus().getReconciliationStatus();
+ assertFalse(reconciliationStatus.isSuccess());
+ assertEquals(
+ "Error while reconciling deployment change: Cannot switch from
job to session cluster",
+ reconciliationStatus.getError());
+ assertNotNull(reconciliationStatus.getLastReconciledSpec().getJob());
+
+ updateControl = testController.reconcile(appCluster, null);
+ assertTrue(updateControl.isNoUpdate());
+ assertEquals(
+ FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS *
1000,
+ (long) updateControl.getScheduleDelay().get());
+
+ // Validate job status correct even with error
+ jobStatus = appCluster.getStatus().getJobStatus();
+ expectedJobStatus = flinkService.listJobs().get(0).f1;
+ assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
+ assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+ assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ }
+
+ private FlinkDeploymentController createTestController() {
+ JobStatusObserver observer = new JobStatusObserver(flinkService);
+ JobReconciler jobReconciler = new JobReconciler(null, flinkService);
+ SessionReconciler sessionReconciler = new SessionReconciler(null,
flinkService);
+
+ return new FlinkDeploymentController(
+ null, "test", observer, jobReconciler, sessionReconciler);
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index e2a1dfd..48f7b9e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -40,8 +40,10 @@ public class JobStatusObserverTest {
FlinkService flinkService = new TestingFlinkService();
JobStatusObserver observer = new JobStatusObserver(flinkService);
FlinkDeployment deployment = TestUtils.buildSessionCluster();
- deployment.setStatus(new FlinkDeploymentStatus());
- deployment.getStatus().setSpec(deployment.getSpec());
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(deployment.getSpec());
assertTrue(
observer.observeFlinkJobStatus(
deployment,
FlinkUtils.getEffectiveConfig(deployment)));
@@ -56,7 +58,10 @@ public class JobStatusObserverTest {
assertTrue(observer.observeFlinkJobStatus(deployment, conf));
deployment.setStatus(new FlinkDeploymentStatus());
- deployment.getStatus().setSpec(deployment.getSpec());
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(deployment.getSpec());
assertFalse(observer.observeFlinkJobStatus(deployment, conf));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 2e2bf78..5d211a4 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -49,7 +49,10 @@ public class JobReconcilerTest {
List<Tuple2<String, JobStatusMessage>> runningJobs =
flinkService.listJobs();
assertEquals(1, runningJobs.size());
assertNull(runningJobs.get(0).f0);
- deployment.getStatus().setSpec(deployment.getSpec());
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(deployment.getSpec());
JobStatus jobStatus = new JobStatus();
jobStatus.setJobName(runningJobs.get(0).f1.getJobName());