This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b44a9a  [FLINK-26768] Clear errors when observed JobManager 
deployment not in ERROR status
0b44a9a is described below

commit 0b44a9a5695c0658224c972611c42e14d1e89d6a
Author: wangyang0918 <[email protected]>
AuthorDate: Wed Mar 23 12:50:26 2022 +0800

    [FLINK-26768] Clear errors when observed JobManager deployment not in ERROR 
status
    
    This closes #101.
---
 .../kubernetes/operator/observer/BaseObserver.java | 10 ++++
 .../kubernetes/operator/observer/JobObserver.java  |  1 +
 .../operator/observer/SessionObserver.java         |  1 +
 .../controller/FlinkDeploymentControllerTest.java  | 67 ++++++++++++++++------
 4 files changed, 63 insertions(+), 16 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 51d300b..85d47fd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
@@ -144,4 +145,13 @@ public abstract class BaseObserver implements Observer {
     protected boolean isClusterReady(FlinkDeployment dep) {
         return dep.getStatus().getJobManagerDeploymentStatus() == 
JobManagerDeploymentStatus.READY;
     }
+
+    protected void 
clearErrorsIfJobManagerDeploymentNotInErrorStatus(FlinkDeployment dep) {
+        if (dep.getStatus().getJobManagerDeploymentStatus() != 
JobManagerDeploymentStatus.ERROR) {
+            final ReconciliationStatus reconciliationStatus =
+                    dep.getStatus().getReconciliationStatus();
+            reconciliationStatus.setSuccess(true);
+            reconciliationStatus.setError(null);
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
index b8a1db3..0802dfe 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -56,6 +56,7 @@ public class JobObserver extends BaseObserver {
                 observeSavepointStatus(flinkApp, effectiveConfig);
             }
         }
+        clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
     }
 
     private boolean observeFlinkJobStatus(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
index b70c177..80a4fbf 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -52,5 +52,6 @@ public class SessionObserver extends BaseObserver {
                 }
             }
         }
+        clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 24d8f3f..96eedd0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -39,6 +39,7 @@ import io.fabric8.kubernetes.api.model.ContainerStatus;
 import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
 import io.fabric8.kubernetes.api.model.EventBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.api.model.PodListBuilder;
 import io.fabric8.kubernetes.api.model.PodStatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -215,22 +216,7 @@ public class FlinkDeploymentControllerTest {
                 .once();
 
         String crashLoopMessage = "container fails";
-        ContainerStatus cs =
-                new ContainerStatusBuilder()
-                        .withNewState()
-                        .withNewWaiting()
-                        
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
-                        .withMessage(crashLoopMessage)
-                        .endWaiting()
-                        .endState()
-                        .build();
-
-        Pod pod = TestUtils.getTestPod("host", "apiVersion", 
Collections.emptyList());
-        pod.setStatus(
-                new PodStatusBuilder()
-                        .withContainerStatuses(Collections.singletonList(cs))
-                        .build());
-        flinkService.setJmPodList(new PodListBuilder().withItems(pod).build());
+        flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         UpdateControl<FlinkDeployment> updateControl;
@@ -479,6 +465,55 @@ public class FlinkDeploymentControllerTest {
                 
eventSources.stream().map(EventSource::name).collect(Collectors.toSet()));
     }
 
+    @Test
+    public void testSuccessfulObservationShouldClearErrors() {
+        final String crashLoopMessage = "deploy errors";
+        flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
+        testController.reconcile(appCluster, 
TestUtils.createContextWithInProgressDeployment());
+
+        // Failed JobManager deployment should set errors to the status
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        ReconciliationStatus reconciliationStatus =
+                appCluster.getStatus().getReconciliationStatus();
+        assertFalse(reconciliationStatus.isSuccess());
+        assertEquals(crashLoopMessage, reconciliationStatus.getError());
+
+        // JobManager deployment becomes ready and successful observation 
should clear the errors
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        reconciliationStatus = 
appCluster.getStatus().getReconciliationStatus();
+        assertTrue(reconciliationStatus.isSuccess());
+        assertNull(reconciliationStatus.getError());
+    }
+
+    private PodList createFailedPodList(String crashLoopMessage) {
+        ContainerStatus cs =
+                new ContainerStatusBuilder()
+                        .withNewState()
+                        .withNewWaiting()
+                        
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+                        .withMessage(crashLoopMessage)
+                        .endWaiting()
+                        .endState()
+                        .build();
+
+        Pod pod = TestUtils.getTestPod("host", "apiVersion", 
Collections.emptyList());
+        pod.setStatus(
+                new PodStatusBuilder()
+                        .withContainerStatuses(Collections.singletonList(cs))
+                        .build());
+        return new PodListBuilder().withItems(pod).build();
+    }
+
     private FlinkDeploymentController createTestController(
             KubernetesClient kubernetesClient, TestingFlinkService 
flinkService) {
 

Reply via email to