This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0b44a9a [FLINK-26768] Clear errors when observed JobManager
deployment not in ERROR status
0b44a9a is described below
commit 0b44a9a5695c0658224c972611c42e14d1e89d6a
Author: wangyang0918 <[email protected]>
AuthorDate: Wed Mar 23 12:50:26 2022 +0800
[FLINK-26768] Clear errors when observed JobManager deployment not in ERROR
status
This closes #101.
---
.../kubernetes/operator/observer/BaseObserver.java | 10 ++++
.../kubernetes/operator/observer/JobObserver.java | 1 +
.../operator/observer/SessionObserver.java | 1 +
.../controller/FlinkDeploymentControllerTest.java | 67 ++++++++++++++++------
4 files changed, 63 insertions(+), 16 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 51d300b..85d47fd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -144,4 +145,13 @@ public abstract class BaseObserver implements Observer {
protected boolean isClusterReady(FlinkDeployment dep) {
return dep.getStatus().getJobManagerDeploymentStatus() ==
JobManagerDeploymentStatus.READY;
}
+
+ protected void
clearErrorsIfJobManagerDeploymentNotInErrorStatus(FlinkDeployment dep) {
+ if (dep.getStatus().getJobManagerDeploymentStatus() !=
JobManagerDeploymentStatus.ERROR) {
+ final ReconciliationStatus reconciliationStatus =
+ dep.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(true);
+ reconciliationStatus.setError(null);
+ }
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
index b8a1db3..0802dfe 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -56,6 +56,7 @@ public class JobObserver extends BaseObserver {
observeSavepointStatus(flinkApp, effectiveConfig);
}
}
+ clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
}
private boolean observeFlinkJobStatus(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
index b70c177..80a4fbf 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -52,5 +52,6 @@ public class SessionObserver extends BaseObserver {
}
}
}
+ clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 24d8f3f..96eedd0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -39,6 +39,7 @@ import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.api.model.PodStatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -215,22 +216,7 @@ public class FlinkDeploymentControllerTest {
.once();
String crashLoopMessage = "container fails";
- ContainerStatus cs =
- new ContainerStatusBuilder()
- .withNewState()
- .withNewWaiting()
-
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
- .withMessage(crashLoopMessage)
- .endWaiting()
- .endState()
- .build();
-
- Pod pod = TestUtils.getTestPod("host", "apiVersion",
Collections.emptyList());
- pod.setStatus(
- new PodStatusBuilder()
- .withContainerStatuses(Collections.singletonList(cs))
- .build());
- flinkService.setJmPodList(new PodListBuilder().withItems(pod).build());
+ flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
UpdateControl<FlinkDeployment> updateControl;
@@ -479,6 +465,55 @@ public class FlinkDeploymentControllerTest {
eventSources.stream().map(EventSource::name).collect(Collectors.toSet()));
}
+ @Test
+ public void testSuccessfulObservationShouldClearErrors() {
+ final String crashLoopMessage = "deploy errors";
+ flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
+
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+
+ testController.reconcile(appCluster, TestUtils.createEmptyContext());
+ testController.reconcile(appCluster,
TestUtils.createContextWithInProgressDeployment());
+
+ // Failed JobManager deployment should set errors to the status
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ ReconciliationStatus reconciliationStatus =
+ appCluster.getStatus().getReconciliationStatus();
+ assertFalse(reconciliationStatus.isSuccess());
+ assertEquals(crashLoopMessage, reconciliationStatus.getError());
+
+ // JobManager deployment becomes ready and successful observation
should clear the errors
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ reconciliationStatus =
appCluster.getStatus().getReconciliationStatus();
+ assertTrue(reconciliationStatus.isSuccess());
+ assertNull(reconciliationStatus.getError());
+ }
+
+ private PodList createFailedPodList(String crashLoopMessage) {
+ ContainerStatus cs =
+ new ContainerStatusBuilder()
+ .withNewState()
+ .withNewWaiting()
+
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+ .withMessage(crashLoopMessage)
+ .endWaiting()
+ .endState()
+ .build();
+
+ Pod pod = TestUtils.getTestPod("host", "apiVersion",
Collections.emptyList());
+ pod.setStatus(
+ new PodStatusBuilder()
+ .withContainerStatuses(Collections.singletonList(cs))
+ .build());
+ return new PodListBuilder().withItems(pod).build();
+ }
+
private FlinkDeploymentController createTestController(
KubernetesClient kubernetesClient, TestingFlinkService
flinkService) {