This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4bac805 [FLINK-26432] Cleanly separate validator, observer and
reconciler modules
4bac805 is described below
commit 4bac8059ec5d3c68da19beec22f29c434562516e
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Feb 28 06:17:11 2022 +0100
[FLINK-26432] Cleanly separate validator, observer and reconciler modules
closes #26
---
.../flink/kubernetes/operator/FlinkOperator.java | 3 +
.../controller/FlinkDeploymentController.java | 20 ++-
.../operator/crd/status/FlinkDeploymentStatus.java | 6 +-
.../JobManagerDeploymentStatus.java | 4 +-
.../operator/observer/JobStatusObserver.java | 115 ---------------
.../kubernetes/operator/observer/Observer.java | 161 +++++++++++++++++++++
.../operator/reconciler/BaseReconciler.java | 74 +---------
.../operator/reconciler/JobReconciler.java | 27 +---
.../operator/reconciler/SessionReconciler.java | 7 -
.../flink/kubernetes/operator/TestUtils.java | 17 +++
.../kubernetes/operator/TestingFlinkService.java | 7 +-
.../controller/FlinkDeploymentControllerTest.java | 47 +++---
.../operator/observer/JobStatusObserverTest.java | 77 ----------
.../kubernetes/operator/observer/ObserverTest.java | 135 +++++++++++++++++
.../operator/reconciler/JobReconcilerTest.java | 4 +-
.../crds/flinkdeployments.flink.apache.org-v1.yml | 7 +
16 files changed, 394 insertions(+), 317 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8fa72b9..fd9d60c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,6 +21,7 @@ import
org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -55,6 +56,7 @@ public class FlinkOperator {
FlinkService flinkService = new FlinkService(client);
+ Observer observer = new Observer(flinkService);
JobReconciler jobReconciler = new JobReconciler(client, flinkService);
SessionReconciler sessionReconciler = new SessionReconciler(client,
flinkService);
@@ -66,6 +68,7 @@ public class FlinkOperator {
client,
namespace,
validator,
+ observer,
jobReconciler,
sessionReconciler);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 5dff8f4..b9d5cd4 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -64,6 +65,7 @@ public class FlinkDeploymentController
private final String operatorNamespace;
private final FlinkDeploymentValidator validator;
+ private final Observer observer;
private final JobReconciler jobReconciler;
private final SessionReconciler sessionReconciler;
private final DefaultConfig defaultConfig;
@@ -73,12 +75,14 @@ public class FlinkDeploymentController
KubernetesClient kubernetesClient,
String operatorNamespace,
FlinkDeploymentValidator validator,
+ Observer observer,
JobReconciler jobReconciler,
SessionReconciler sessionReconciler) {
this.defaultConfig = defaultConfig;
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = operatorNamespace;
this.validator = validator;
+ this.observer = observer;
this.jobReconciler = jobReconciler;
this.sessionReconciler = sessionReconciler;
}
@@ -86,12 +90,12 @@ public class FlinkDeploymentController
@Override
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
+
+ observer.observe(flinkApp, context, effectiveConfig);
return getReconciler(flinkApp)
- .shutdownAndDelete(
- operatorNamespace,
- flinkApp,
- context,
- FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig()));
+ .shutdownAndDelete(operatorNamespace, flinkApp,
effectiveConfig);
}
@Override
@@ -107,6 +111,12 @@ public class FlinkDeploymentController
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
+
+ boolean readyToReconcile = observer.observe(flinkApp, context,
effectiveConfig);
+ if (!readyToReconcile) {
+ return
flinkApp.getStatus().getJobManagerDeploymentStatus().toUpdateControl(flinkApp);
+ }
+
try {
UpdateControl<FlinkDeployment> updateControl =
getReconciler(flinkApp)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index 928c827..e192163 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -17,6 +17,8 @@
package org.apache.flink.kubernetes.operator.crd.status;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -26,6 +28,8 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class FlinkDeploymentStatus {
- private JobStatus jobStatus;
+ private JobStatus jobStatus = new JobStatus();
+ private JobManagerDeploymentStatus jobManagerDeploymentStatus =
+ JobManagerDeploymentStatus.MISSING;
private ReconciliationStatus reconciliationStatus = new
ReconciliationStatus();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
similarity index 97%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 5e634a9..08262ff 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -44,12 +44,12 @@ public enum JobManagerDeploymentStatus {
public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment
flinkDeployment) {
switch (this) {
case DEPLOYING:
+ case READY:
return UpdateControl.updateStatus(flinkDeployment)
.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
case DEPLOYED_NOT_READY:
return UpdateControl.updateStatus(flinkDeployment)
.rescheduleAfter(PORT_READY_DELAY_SECONDS,
TimeUnit.SECONDS);
- case READY:
case MISSING:
default:
return null;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
deleted file mode 100644
index 577d73b..0000000
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.observer;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.runtime.client.JobStatusMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/** Observes the actual state of the running jobs on the Flink cluster. */
-public class JobStatusObserver {
-
- private static final Logger LOG =
LoggerFactory.getLogger(JobStatusObserver.class);
-
- private final FlinkService flinkService;
-
- public JobStatusObserver(FlinkService flinkService) {
- this.flinkService = flinkService;
- }
-
- public boolean observeFlinkJobStatus(FlinkDeployment flinkApp,
Configuration effectiveConfig)
- throws Exception {
- FlinkDeploymentSpec lastReconciledSpec =
-
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
-
- if (lastReconciledSpec == null) {
- // This is the first run, nothing to observe
- return true;
- }
-
- JobSpec jobSpec = lastReconciledSpec.getJob();
-
- if (jobSpec == null) {
- // This is a session cluster, nothing to observe
- return true;
- }
-
- if (!jobSpec.getState().equals(JobState.RUNNING)) {
- // The job is not running, nothing to observe
- return true;
- }
- LOG.info("Getting job statuses for {}",
flinkApp.getMetadata().getName());
- FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-
- Collection<JobStatusMessage> clusterJobStatuses =
flinkService.listJobs(effectiveConfig);
- if (clusterJobStatuses.isEmpty()) {
- LOG.info("No jobs found on {} yet",
flinkApp.getMetadata().getName());
- return false;
- } else {
- flinkAppStatus.setJobStatus(
- mergeJobStatus(
- flinkAppStatus.getJobStatus(), new
ArrayList<>(clusterJobStatuses)));
- LOG.info("Job statuses updated for {}",
flinkApp.getMetadata().getName());
- return true;
- }
- }
-
- /** Merge previous job status with the new one from the flink job cluster.
*/
- private JobStatus mergeJobStatus(
- JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) {
- JobStatus newStatus = oldStatus;
- Collections.sort(
- clusterJobStatuses,
- (j1, j2) -> -1 * Long.compare(j1.getStartTime(),
j2.getStartTime()));
- JobStatusMessage newJob = clusterJobStatuses.get(0);
-
- if (newStatus == null) {
- newStatus = createJobStatus(newJob);
- } else {
- newStatus.setState(newJob.getJobState().name());
- newStatus.setJobName(newJob.getJobName());
- newStatus.setJobId(newJob.getJobId().toHexString());
- // track the start time, changing timestamp would cause busy
reconciliation
- newStatus.setUpdateTime(String.valueOf(newJob.getStartTime()));
- }
- return newStatus;
- }
-
- public static JobStatus createJobStatus(JobStatusMessage message) {
- JobStatus jobStatus = new JobStatus();
- jobStatus.setJobId(message.getJobId().toHexString());
- jobStatus.setJobName(message.getJobName());
- jobStatus.setState(message.getJobState().name());
- jobStatus.setUpdateTime(String.valueOf(message.getStartTime()));
- return jobStatus;
- }
-}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
new file mode 100644
index 0000000..2772fbb
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+ private final FlinkService flinkService;
+
+ public Observer(FlinkService flinkService) {
+ this.flinkService = flinkService;
+ }
+
+ public boolean observe(
+ FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
+ observeJmDeployment(flinkApp, context, effectiveConfig);
+ return isReadyToReconcile(flinkApp, effectiveConfig);
+ }
+
+ private void observeJmDeployment(
+ FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
+ FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+ JobManagerDeploymentStatus previousJmStatus =
+ deploymentStatus.getJobManagerDeploymentStatus();
+
+ if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+ return;
+ }
+
+ if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus)
{
+
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ return;
+ }
+
+ Optional<Deployment> deployment =
context.getSecondaryResource(Deployment.class);
+ if (deployment.isPresent()) {
+ DeploymentStatus status = deployment.get().getStatus();
+ DeploymentSpec spec = deployment.get().getSpec();
+ if (status != null
+ && status.getAvailableReplicas() != null
+ && spec.getReplicas().intValue() == status.getReplicas()
+ && spec.getReplicas().intValue() ==
status.getAvailableReplicas()
+ && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+ // typically it takes a few seconds for the REST server to be
ready
+ LOG.info(
+ "JobManager deployment {} in namespace {} port ready,
waiting for the REST API...",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace());
+ deploymentStatus.setJobManagerDeploymentStatus(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+ return;
+ }
+ LOG.info(
+ "JobManager deployment {} in namespace {} exists but not
ready yet, status {}",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace(),
+ status);
+
+
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+ return;
+ }
+
+
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ }
+
+ private boolean observeFlinkJobStatus(FlinkDeployment flinkApp,
Configuration effectiveConfig) {
+
+ // No need to observe job status for session clusters
+ if (flinkApp.getSpec().getJob() == null) {
+ return true;
+ }
+
+ LOG.info("Getting job statuses for {}",
flinkApp.getMetadata().getName());
+ FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+ Collection<JobStatusMessage> clusterJobStatuses;
+ try {
+ clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+ } catch (Exception e) {
+ LOG.error("Exception while listing jobs", e);
+ flinkAppStatus.getJobStatus().setState("UNKNOWN");
+ return false;
+ }
+ if (clusterJobStatuses.isEmpty()) {
+ LOG.info("No jobs found on {} yet",
flinkApp.getMetadata().getName());
+ return false;
+ } else {
+ updateJobStatus(flinkAppStatus.getJobStatus(), new
ArrayList<>(clusterJobStatuses));
+ LOG.info("Job statuses updated for {}",
flinkApp.getMetadata().getName());
+ return true;
+ }
+ }
+
+ private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
+ JobManagerDeploymentStatus jmDeploymentStatus =
+ flinkApp.getStatus().getJobManagerDeploymentStatus();
+
+ switch (jmDeploymentStatus) {
+ case READY:
+ return observeFlinkJobStatus(flinkApp, effectiveConfig);
+ case MISSING:
+ return true;
+ case DEPLOYING:
+ case DEPLOYED_NOT_READY:
+ return false;
+ default:
+ throw new RuntimeException("Unknown status: " +
jmDeploymentStatus);
+ }
+ }
+
+ /** Update previous job status based on the job list from the cluster. */
+ private void updateJobStatus(JobStatus status, List<JobStatusMessage>
clusterJobStatuses) {
+ Collections.sort(
+ clusterJobStatuses, (j1, j2) ->
Long.compare(j2.getStartTime(), j1.getStartTime()));
+ JobStatusMessage newJob = clusterJobStatuses.get(0);
+
+ status.setState(newJob.getJobState().name());
+ status.setJobName(newJob.getJobName());
+ status.setJobId(newJob.getJobId().toHexString());
+ // track the start time, changing timestamp would cause busy
reconciliation
+ status.setUpdateTime(String.valueOf(newJob.getStartTime()));
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index d6cbb29..b94956b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -19,13 +19,11 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -33,9 +31,6 @@ import
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Optional;
-
/** BaseReconciler with functionality that is common to job and session modes.
*/
public abstract class BaseReconciler {
@@ -44,8 +39,6 @@ public abstract class BaseReconciler {
public static final int REFRESH_SECONDS = 60;
public static final int PORT_READY_DELAY_SECONDS = 10;
- private final HashSet<String> jobManagerDeployments = new HashSet<>();
-
protected final KubernetesClient kubernetesClient;
protected final FlinkService flinkService;
@@ -54,10 +47,6 @@ public abstract class BaseReconciler {
this.flinkService = flinkService;
}
- public boolean removeDeployment(FlinkDeployment flinkApp) {
- return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
- }
-
public abstract UpdateControl<FlinkDeployment> reconcile(
String operatorNamespace,
FlinkDeployment flinkApp,
@@ -65,70 +54,15 @@ public abstract class BaseReconciler {
Configuration effectiveConfig)
throws Exception;
- protected JobManagerDeploymentStatus checkJobManagerDeployment(
- FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
- if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
- Optional<Deployment> deployment =
context.getSecondaryResource(Deployment.class);
- if (deployment.isPresent()) {
- DeploymentStatus status = deployment.get().getStatus();
- DeploymentSpec spec = deployment.get().getSpec();
- if (status != null
- && status.getAvailableReplicas() != null
- && spec.getReplicas().intValue() ==
status.getReplicas()
- && spec.getReplicas().intValue() ==
status.getAvailableReplicas()) {
- // typically it takes a few seconds for the REST server to
be ready
- if (flinkService.isJobManagerPortReady(effectiveConfig)) {
- LOG.info(
- "JobManager deployment {} in namespace {} is
ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
-
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
- if (flinkApp.getStatus().getJobStatus() != null) {
- // pre-existing deployments on operator restart -
proceed with
- // reconciliation
- return JobManagerDeploymentStatus.READY;
- }
- }
- LOG.info(
- "JobManager deployment {} in namespace {} port not
ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
- return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
- }
- LOG.info(
- "JobManager deployment {} in namespace {} not yet
ready, status {}",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace(),
- status);
-
- return JobManagerDeploymentStatus.DEPLOYING;
- }
- return JobManagerDeploymentStatus.MISSING;
- }
- return JobManagerDeploymentStatus.READY;
- }
-
- /**
- * Shuts down the job and deletes all kubernetes resources including k8s
HA resources. It will
- * first perform a graceful shutdown (cancel) before deleting the data if
that is unsuccessful
- * in order to avoid leaking HA metadata to durable storage.
- *
- * <p>This feature is limited at the moment to cleaning up native
kubernetes HA resources, other
- * HA providers like ZK need to be cleaned up manually after deletion.
- */
public DeleteControl shutdownAndDelete(
- String operatorNamespace,
- FlinkDeployment flinkApp,
- Context context,
- Configuration effectiveConfig) {
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration
effectiveConfig) {
- if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
- == JobManagerDeploymentStatus.READY) {
+ if (JobManagerDeploymentStatus.READY
+ == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
shutdown(flinkApp, effectiveConfig);
} else {
FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
}
- removeDeployment(flinkApp);
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, true);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index ee121e9..3abb860 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,7 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -48,11 +48,8 @@ public class JobReconciler extends BaseReconciler {
private static final Logger LOG =
LoggerFactory.getLogger(JobReconciler.class);
- private final JobStatusObserver observer;
-
public JobReconciler(KubernetesClient kubernetesClient, FlinkService
flinkService) {
super(kubernetesClient, flinkService);
- this.observer = new JobStatusObserver(flinkService);
}
@Override
@@ -72,19 +69,7 @@ public class JobReconciler extends BaseReconciler {
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
- }
-
- // wait until the deployment is ready
- UpdateControl<FlinkDeployment> uc =
- checkJobManagerDeployment(flinkApp, context, effectiveConfig)
- .toUpdateControl(flinkApp);
- if (uc != null) {
- return uc;
- }
-
- if (!observer.observeFlinkJobStatus(flinkApp, effectiveConfig)) {
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+ return
JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp);
}
// TODO: following assumes that current job is running
@@ -174,15 +159,17 @@ public class JobReconciler extends BaseReconciler {
effectiveConfig);
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
jobStatus.setState("suspended");
- removeDeployment(flinkApp);
+
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
savepointOpt.ifPresent(jobStatus::setSavepointLocation);
return savepointOpt;
}
@Override
protected void shutdown(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
- if (flinkApp.getStatus().getJobStatus() != null
- && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+ if (org.apache.flink.api.common.JobStatus.RUNNING
+ .name()
+
.equalsIgnoreCase(flinkApp.getStatus().getJobStatus().getState())) {
+ LOG.info("Job is running, attempting graceful shutdown.");
try {
flinkService.cancelJob(
JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index e72de56..d63fdeb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -60,13 +60,6 @@ public class SessionReconciler extends BaseReconciler {
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
}
- UpdateControl<FlinkDeployment> uc =
- checkJobManagerDeployment(flinkApp, context, effectiveConfig)
- .toUpdateControl(flinkApp);
- if (uc != null) {
- return uc;
- }
-
boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
upgradeSessionCluster(flinkApp, effectiveConfig);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 9ada6f8..055d91b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -33,9 +33,12 @@ import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
/** Testing utilities. */
public class TestUtils {
@@ -109,4 +112,18 @@ public class TestUtils {
pod.setSpec(podSpec);
return pod;
}
+
+ public static Context createEmptyContext() {
+ return new Context() {
+ @Override
+ public Optional<RetryInfo> getRetryInfo() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> Optional<T> getSecondaryResource(Class<T> aClass,
String s) {
+ return Optional.empty();
+ }
+ };
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 610176b..2aab5ab 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -41,6 +41,7 @@ public class TestingFlinkService extends FlinkService {
private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
private Set<String> sessions = new HashSet<>();
+ private boolean isPortReady = true;
public TestingFlinkService() {
super(null);
@@ -104,6 +105,10 @@ public class TestingFlinkService extends FlinkService {
@Override
public boolean isJobManagerPortReady(Configuration config) {
- return true;
+ return isPortReady;
+ }
+
+ public void setPortReady(boolean isPortReady) {
+ this.isPortReady = isPortReady;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index ee7a386..f4468c6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
-import
org.apache.flink.kubernetes.operator.reconciler.JobManagerDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -60,20 +60,12 @@ public class FlinkDeploymentControllerTest {
UpdateControl<FlinkDeployment> updateControl;
- updateControl = testController.reconcile(appCluster, context);
+ updateControl = testController.reconcile(appCluster,
TestUtils.createEmptyContext());
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- JobManagerDeploymentStatus.DEPLOYED_NOT_READY
- .toUpdateControl(appCluster)
- .getScheduleDelay(),
+
JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(appCluster).getScheduleDelay(),
updateControl.getScheduleDelay());
- updateControl = testController.reconcile(appCluster, context);
- assertTrue(updateControl.isUpdateStatus());
- assertEquals(
- BaseReconciler.REFRESH_SECONDS * 1000,
- (long) updateControl.getScheduleDelay().get());
-
// Validate reconciliation status
ReconciliationStatus reconciliationStatus =
appCluster.getStatus().getReconciliationStatus();
@@ -84,8 +76,16 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- BaseReconciler.REFRESH_SECONDS * 1000,
- (long) updateControl.getScheduleDelay().get());
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+ .toUpdateControl(appCluster)
+ .getScheduleDelay(),
+ updateControl.getScheduleDelay());
+
+ updateControl = testController.reconcile(appCluster, context);
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+
JobManagerDeploymentStatus.READY.toUpdateControl(appCluster).getScheduleDelay(),
+ updateControl.getScheduleDelay());
// Validate job status
JobStatus jobStatus = appCluster.getStatus().getJobStatus();
@@ -122,7 +122,7 @@ public class FlinkDeploymentControllerTest {
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
appCluster.getSpec().getJob().setInitialSavepointPath("s0");
- testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, TestUtils.createEmptyContext());
List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
@@ -143,16 +143,20 @@ public class FlinkDeploymentControllerTest {
jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("savepoint_0", jobs.get(0).f0);
+ testController.reconcile(appCluster, context);
// Suspend job
appCluster = TestUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.MISSING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
// Resume from last savepoint
appCluster = TestUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.RUNNING);
- testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, TestUtils.createEmptyContext());
jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("savepoint_1", jobs.get(0).f0);
@@ -171,7 +175,7 @@ public class FlinkDeploymentControllerTest {
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
appCluster.getSpec().getJob().setInitialSavepointPath("s0");
- testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, TestUtils.createEmptyContext());
List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
@@ -180,6 +184,13 @@ public class FlinkDeploymentControllerTest {
appCluster = TestUtils.clone(appCluster);
appCluster.getSpec().getJob().setParallelism(100);
+ UpdateControl<FlinkDeployment> updateControl =
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+ .toUpdateControl(appCluster)
+ .getScheduleDelay(),
+ updateControl.getScheduleDelay());
testController.reconcile(appCluster, context);
jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
@@ -200,6 +211,7 @@ public class FlinkDeploymentControllerTest {
}
private FlinkDeploymentController createTestController(TestingFlinkService
flinkService) {
+ Observer observer = new Observer(flinkService);
JobReconciler jobReconciler = new JobReconciler(null, flinkService);
SessionReconciler sessionReconciler = new SessionReconciler(null,
flinkService);
@@ -208,6 +220,7 @@ public class FlinkDeploymentControllerTest {
null,
"test",
new DefaultDeploymentValidator(),
+ observer,
jobReconciler,
sessionReconciler);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
deleted file mode 100644
index 769af93..0000000
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.observer;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.TestUtils;
-import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** @link JobStatusObserver unit tests */
-public class JobStatusObserverTest {
-
- @Test
- public void observeSessionCluster() throws Exception {
- FlinkService flinkService = new TestingFlinkService();
- JobStatusObserver observer = new JobStatusObserver(flinkService);
- FlinkDeployment deployment = TestUtils.buildSessionCluster();
- deployment
- .getStatus()
- .getReconciliationStatus()
- .setLastReconciledSpec(deployment.getSpec());
- assertTrue(
- observer.observeFlinkJobStatus(
- deployment,
- FlinkUtils.getEffectiveConfig(deployment, new
Configuration())));
- }
-
- @Test
- public void observeApplicationCluster() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
- JobStatusObserver observer = new JobStatusObserver(flinkService);
- FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
-
- assertTrue(observer.observeFlinkJobStatus(deployment, conf));
- deployment.setStatus(new FlinkDeploymentStatus());
- deployment
- .getStatus()
- .getReconciliationStatus()
- .setLastReconciledSpec(deployment.getSpec());
-
- flinkService.submitApplicationCluster(deployment, conf);
- assertTrue(observer.observeFlinkJobStatus(deployment, conf));
-
- assertEquals(
- deployment.getMetadata().getName(),
- deployment.getStatus().getJobStatus().getJobName());
-
- deployment.getSpec().getJob().setState(JobState.SUSPENDED);
- flinkService.clear();
- assertTrue(observer.observeFlinkJobStatus(deployment, conf));
- }
-}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
new file mode 100644
index 0000000..4e7615c
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link Observer unit tests */
+public class ObserverTest {
+
+ private final Context readyContext =
+ JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+
+ @Test
+ public void observeSessionCluster() {
+ FlinkService flinkService = new TestingFlinkService();
+ Observer observer = new Observer(flinkService);
+ FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(deployment.getSpec());
+
+ assertFalse(
+ observer.observe(
+ deployment,
+ readyContext,
+ FlinkUtils.getEffectiveConfig(deployment, new
Configuration())));
+
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ assertTrue(
+ observer.observe(
+ deployment,
+ readyContext,
+ FlinkUtils.getEffectiveConfig(deployment, new
Configuration())));
+
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ }
+
+ @Test
+ public void observeApplicationCluster() {
+ TestingFlinkService flinkService = new TestingFlinkService();
+ Observer observer = new Observer(flinkService);
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+
+ assertTrue(observer.observe(deployment,
TestUtils.createEmptyContext(), conf));
+
+ deployment.setStatus(new FlinkDeploymentStatus());
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(deployment.getSpec());
+ deployment.getStatus().setJobStatus(new JobStatus());
+ flinkService.submitApplicationCluster(deployment, conf);
+
+ // Validate port check logic
+ flinkService.setPortReady(false);
+
+ // Port not ready
+ assertFalse(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ assertFalse(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ flinkService.setPortReady(true);
+ // Port ready but we have to recheck once again
+ assertFalse(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ // Stable ready
+ assertTrue(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ assertTrue(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ assertEquals(
+ deployment.getMetadata().getName(),
+ deployment.getStatus().getJobStatus().getJobName());
+
+ // Test listing failure
+ flinkService.clear();
+ assertFalse(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals("UNKNOWN",
deployment.getStatus().getJobStatus().getState());
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 6a2109b..2ee8793 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -25,7 +25,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -115,7 +115,7 @@ public class JobReconcilerTest {
final String expectedSavepointPath = "savepoint_0";
final Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
- JobStatusObserver observer = new JobStatusObserver(flinkService);
+ Observer observer = new Observer(flinkService);
final JobReconciler reconciler = new JobReconciler(null, flinkService);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 5ae554c..ec9ee63 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9077,6 +9077,13 @@ spec:
savepointLocation:
type: string
type: object
+ jobManagerDeploymentStatus:
+ enum:
+ - READY
+ - DEPLOYED_NOT_READY
+ - DEPLOYING
+ - MISSING
+ type: string
reconciliationStatus:
properties:
success: