This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ee20209  [FLINK-26261] Introduce status and create event for terminal 
deployment error
ee20209 is described below

commit ee20209d95b7a155be0761eb1b60829fe489924f
Author: Thomas Weise <[email protected]>
AuthorDate: Mon Mar 7 20:32:02 2022 -0800

    [FLINK-26261] Introduce status and create event for terminal deployment 
error
    
    Closes #46
---
 .../controller/FlinkDeploymentController.java      | 48 ++++++++++++-----
 .../exception/DeploymentFailedException.java       | 63 ++++++++++++++++++++++
 .../exception/InvalidDeploymentException.java      | 30 -----------
 .../observer/JobManagerDeploymentStatus.java       |  9 +++-
 .../kubernetes/operator/observer/Observer.java     | 16 ++++++
 .../flink/kubernetes/operator/TestUtils.java       | 62 +++++++++++++++++++++
 .../controller/FlinkDeploymentControllerTest.java  | 56 ++++++++++++++++++-
 .../kubernetes/operator/observer/ObserverTest.java |  6 +--
 .../operator/reconciler/JobReconcilerTest.java     | 34 ++----------
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  1 +
 10 files changed, 242 insertions(+), 83 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 26c0097..2a2a8d3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -21,8 +21,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -31,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 import org.apache.flink.util.Preconditions;
 
+import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
@@ -92,8 +94,11 @@ public class FlinkDeploymentController
         LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
-
-        observer.observe(flinkApp, context, effectiveConfig);
+        try {
+            observer.observe(flinkApp, context, effectiveConfig);
+        } catch (DeploymentFailedException dfe) {
+            // ignore during cleanup
+        }
         return reconcilerFactory
                 .getOrCreate(flinkApp)
                 .cleanup(operatorNamespace, flinkApp, effectiveConfig);
@@ -113,28 +118,45 @@ public class FlinkDeploymentController
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
 
-        boolean readyToReconcile = observer.observe(flinkApp, context, 
effectiveConfig);
-        if (!readyToReconcile) {
-            return flinkApp.getStatus()
-                    .getJobManagerDeploymentStatus()
-                    .toUpdateControl(flinkApp, operatorConfiguration);
-        }
-
         try {
+            boolean readyToReconcile = observer.observe(flinkApp, context, 
effectiveConfig);
+            if (!readyToReconcile) {
+                return flinkApp.getStatus()
+                        .getJobManagerDeploymentStatus()
+                        .toUpdateControl(flinkApp, operatorConfiguration);
+            }
+
             UpdateControl<FlinkDeployment> updateControl =
                     reconcilerFactory
                             .getOrCreate(flinkApp)
                             .reconcile(operatorNamespace, flinkApp, context, 
effectiveConfig);
             return updateControl;
-        } catch (InvalidDeploymentException ide) {
-            LOG.error("Reconciliation failed", ide);
-            ReconciliationUtils.updateForReconciliationError(flinkApp, 
ide.getMessage());
+        } catch (DeploymentFailedException dfe) {
+            handleDeploymentFailed(flinkApp, dfe);
             return UpdateControl.updateStatus(flinkApp);
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
     }
 
+    private void handleDeploymentFailed(FlinkDeployment flinkApp, 
DeploymentFailedException dfe) {
+        LOG.error(
+                "Deployment {}/{} failed with {}",
+                flinkApp.getMetadata().getNamespace(),
+                flinkApp.getMetadata().getName(),
+                dfe.getMessage());
+        
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+        ReconciliationUtils.updateForReconciliationError(flinkApp, 
dfe.getMessage());
+
+        // TODO: avoid repeated event
+        Event event = DeploymentFailedException.asEvent(dfe, flinkApp);
+        kubernetesClient
+                .v1()
+                .events()
+                .inNamespace(flinkApp.getMetadata().getNamespace())
+                .create(event);
+    }
+
     @Override
     public List<EventSource> 
prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
         Preconditions.checkNotNull(controllerConfig, "Controller config cannot 
be null");
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
new file mode 100644
index 0000000..a16851f
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+
+/** Exception to signal terminal deployment failure. */
+public class DeploymentFailedException extends RuntimeException {
+    public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment";
+
+    public final String component;
+    public final DeploymentCondition deployCondition;
+
+    public DeploymentFailedException(String component, DeploymentCondition 
deployCondition) {
+        super(deployCondition.getMessage());
+        this.component = component;
+        this.deployCondition = deployCondition;
+    }
+
+    public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment 
flinkApp) {
+        EventBuilder evtb =
+                new EventBuilder()
+                        .withApiVersion("v1")
+                        .withNewInvolvedObject()
+                        .withKind(flinkApp.getKind())
+                        .withName(flinkApp.getMetadata().getName())
+                        .withNamespace(flinkApp.getMetadata().getNamespace())
+                        .withUid(flinkApp.getMetadata().getUid())
+                        .endInvolvedObject()
+                        .withType(dfe.deployCondition.getType())
+                        .withReason(dfe.deployCondition.getReason())
+                        
.withFirstTimestamp(dfe.deployCondition.getLastTransitionTime())
+                        
.withLastTimestamp(dfe.deployCondition.getLastUpdateTime())
+                        .withMessage(dfe.getMessage())
+                        .withNewMetadata()
+                        .withGenerateName(flinkApp.getMetadata().getName())
+                        .withNamespace(flinkApp.getMetadata().getNamespace())
+                        .endMetadata()
+                        .withNewSource()
+                        .withComponent(dfe.component)
+                        .endSource();
+        return evtb.build();
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
deleted file mode 100644
index 9ce408e..0000000
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.exception;
-
-/** Exception for encountering invalid FlinkDeployment resources. */
-public class InvalidDeploymentException extends Exception {
-
-    public InvalidDeploymentException(String msg) {
-        super(msg);
-    }
-
-    public InvalidDeploymentException(String msg, Throwable cause) {
-        super(msg, cause);
-    }
-}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 960c1cd..35a395d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -37,7 +37,11 @@ public enum JobManagerDeploymentStatus {
     DEPLOYING,
 
     /** JobManager deployment not found, probably not started or killed by 
user. */
-    MISSING;
+    // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup
+    MISSING,
+
+    /** Deployment in terminal error, requires spec change for reconciliation 
to continue. */
+    ERROR;
 
     public UpdateControl<FlinkDeployment> toUpdateControl(
             FlinkDeployment flinkDeployment, FlinkOperatorConfiguration 
operatorConfiguration) {
@@ -54,8 +58,9 @@ public enum JobManagerDeploymentStatus {
                                 
operatorConfiguration.getPortCheckIntervalInSec(),
                                 TimeUnit.SECONDS);
             case MISSING:
+            case ERROR:
             default:
-                return null;
+                return UpdateControl.noUpdate();
         }
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 5bafee9..46fb983 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -23,12 +23,14 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -100,6 +102,19 @@ public class Observer {
                     flinkApp.getMetadata().getNamespace(),
                     status);
 
+            List<DeploymentCondition> conditions = status.getConditions();
+            for (DeploymentCondition dc : conditions) {
+                if ("FailedCreate".equals(dc.getReason())
+                        && "ReplicaFailure".equals(dc.getType())) {
+                    // throw only when not already in error status to allow 
for spec update
+                    if (!JobManagerDeploymentStatus.ERROR.equals(
+                            deploymentStatus.getJobManagerDeploymentStatus())) 
{
+                        throw new DeploymentFailedException(
+                                
DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
+                    }
+                    return;
+                }
+            }
             
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
             return;
         }
@@ -181,6 +196,7 @@ public class Observer {
                 return observeFlinkJobStatus(flinkApp, effectiveConfig)
                         && observeSavepointStatus(flinkApp, effectiveConfig);
             case MISSING:
+            case ERROR:
                 return true;
             case DEPLOYING:
             case DEPLOYED_NOT_READY:
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 009b17a..d76b98a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -31,6 +31,10 @@ import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 
@@ -109,4 +113,62 @@ public class TestUtils {
             }
         };
     }
+
+    public static Context createContextWithReadyJobManagerDeployment() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(
+                    Class<T> expectedType, String eventSourceName) {
+                DeploymentStatus status = new DeploymentStatus();
+                status.setAvailableReplicas(1);
+                status.setReplicas(1);
+                DeploymentSpec spec = new DeploymentSpec();
+                spec.setReplicas(1);
+                Deployment deployment = new Deployment();
+                deployment.setSpec(spec);
+                deployment.setStatus(status);
+                return Optional.of((T) deployment);
+            }
+        };
+    }
+
+    public static final String DEPLOYMENT_ERROR = "test deployment error 
message";
+
+    public static Context createContextWithFailedJobManagerDeployment() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(
+                    Class<T> expectedType, String eventSourceName) {
+                DeploymentStatus status = new DeploymentStatus();
+                status.setAvailableReplicas(0);
+                status.setReplicas(1);
+                List<DeploymentCondition> conditions =
+                        Collections.singletonList(
+                                new DeploymentCondition(
+                                        null,
+                                        null,
+                                        DEPLOYMENT_ERROR,
+                                        "FailedCreate",
+                                        "status",
+                                        "ReplicaFailure"));
+                status.setConditions(conditions);
+                DeploymentSpec spec = new DeploymentSpec();
+                spec.setReplicas(1);
+                Deployment deployment = new Deployment();
+                deployment.setSpec(spec);
+                deployment.setStatus(status);
+                return Optional.of((T) deployment);
+            }
+        };
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 55d1628..04e9925 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -29,13 +29,13 @@ import 
org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import io.fabric8.kubernetes.api.model.EventBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -43,13 +43,16 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.net.HttpURLConnection;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -62,7 +65,7 @@ import static org.junit.Assert.assertTrue;
 /** @link JobStatusObserver unit tests */
 public class FlinkDeploymentControllerTest {
 
-    private final Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+    private final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
     private final FlinkOperatorConfiguration operatorConfiguration =
             FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
@@ -152,6 +155,55 @@ public class FlinkDeploymentControllerTest {
     }
 
     @Test
+    public void verifyFailedDeployment() throws Exception {
+
+        mockServer
+                .expect()
+                .post()
+                .withPath("/api/v1/namespaces/flink-operator-test/events")
+                .andReturn(
+                        HttpURLConnection.HTTP_CREATED,
+                        new 
EventBuilder().withNewMetadata().endMetadata().build())
+                .once();
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+
+        updateControl =
+                testController.reconcile(
+                        appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(Optional.empty(), updateControl.getScheduleDelay());
+
+        RecordedRequest recordedRequest = mockServer.getLastRequest();
+        assertEquals("POST", recordedRequest.getMethod());
+        
assertTrue(recordedRequest.getBody().readUtf8().contains(TestUtils.DEPLOYMENT_ERROR));
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Validate reconciliation status
+        ReconciliationStatus reconciliationStatus =
+                appCluster.getStatus().getReconciliationStatus();
+        assertFalse(reconciliationStatus.isSuccess());
+        assertNotNull(reconciliationStatus.getError());
+
+        // next cycle should not create another event
+        updateControl =
+                testController.reconcile(
+                        appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.READY
+                        .toUpdateControl(appCluster, operatorConfiguration)
+                        .getScheduleDelay(),
+                updateControl.getScheduleDelay());
+    }
+
+    @Test
     public void verifyUpgradeFromSavepoint() {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
index c5d73d4..0b4289f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -40,10 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** @link Observer unit tests */
 public class ObserverTest {
 
-    private final Context readyContext =
-            JobReconcilerTest.createContextWithReadyJobManagerDeployment();
-    private final FlinkOperatorConfiguration operatorConfiguration =
-            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+    private final Context readyContext = 
TestUtils.createContextWithReadyJobManagerDeployment();
 
     @Test
     public void observeSessionCluster() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index cc7f508..c712501 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -30,15 +30,10 @@ import 
org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
-import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -50,32 +45,9 @@ public class JobReconcilerTest {
     private final FlinkOperatorConfiguration operatorConfiguration =
             FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
-    public static Context createContextWithReadyJobManagerDeployment() {
-        return new Context() {
-            @Override
-            public Optional<RetryInfo> getRetryInfo() {
-                return Optional.empty();
-            }
-
-            @Override
-            public <T> Optional<T> getSecondaryResource(
-                    Class<T> expectedType, String eventSourceName) {
-                DeploymentStatus status = new DeploymentStatus();
-                status.setAvailableReplicas(1);
-                status.setReplicas(1);
-                DeploymentSpec spec = new DeploymentSpec();
-                spec.setReplicas(1);
-                Deployment deployment = new Deployment();
-                deployment.setSpec(spec);
-                deployment.setStatus(status);
-                return Optional.of((T) deployment);
-            }
-        };
-    }
-
     @Test
     public void testUpgrade() throws Exception {
-        Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+        Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
 
         JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
@@ -116,7 +88,7 @@ public class JobReconcilerTest {
     @Test
     public void testUpgradeModeChangeFromSavepointToLastState() throws 
Exception {
         final String expectedSavepointPath = "savepoint_0";
-        final Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+        final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
 
         final JobReconciler reconciler =
@@ -168,7 +140,7 @@ public class JobReconcilerTest {
 
     @Test
     public void triggerSavepoint() throws Exception {
-        Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+        Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
         JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 078c0ed..4ab435f 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9097,6 +9097,7 @@ spec:
                 - DEPLOYED_NOT_READY
                 - DEPLOYING
                 - MISSING
+                - ERROR
                 type: string
               reconciliationStatus:
                 properties:

Reply via email to