This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 899baa22 [FLINK-30157] Trigger Events Before JM Recovery and Unhealthy 
Job Restarts
899baa22 is described below

commit 899baa22c21708179e8856396d816c168a970c0c
Author: Matyas Orhidi <[email protected]>
AuthorDate: Tue Nov 22 14:21:31 2022 -0800

    [FLINK-30157] Trigger Events Before JM Recovery and Unhealthy Job Restarts
---
 .../deployment/ApplicationReconciler.java          | 17 ++++++++
 .../kubernetes/operator/utils/EventRecorder.java   |  4 +-
 .../deployment/ApplicationReconcilerTest.java      | 46 +++++++++++++++++++++-
 3 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 2c39731c..4bb82c5d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -61,6 +61,8 @@ public class ApplicationReconciler
         extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, 
FlinkDeploymentStatus> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationReconciler.class);
+    static final String MSG_RECOVERY = "Recovering lost deployment";
+    static final String MSG_RESTART_UNHEALTHY = "Restarting unhealthy job";
     protected final FlinkService flinkService;
 
     public ApplicationReconciler(
@@ -275,7 +277,22 @@ public class ApplicationReconciler
                 shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
         boolean shouldRecoverDeployment = 
shouldRecoverDeployment(observeConfig, deployment);
         if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
+            if (shouldRecoverDeployment) {
+                eventRecorder.triggerEvent(
+                        deployment,
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.RecoverDeployment,
+                        EventRecorder.Component.Job,
+                        MSG_RECOVERY);
+            }
+
             if (shouldRestartJobBecauseUnhealthy) {
+                eventRecorder.triggerEvent(
+                        deployment,
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.RestartUnhealthyJob,
+                        EventRecorder.Component.Job,
+                        MSG_RESTART_UNHEALTHY);
                 cleanupAfterFailedJob(deployment, ctx, observeConfig);
             }
             resubmitJob(deployment, ctx, observeConfig, true);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index a444577e..81baf9b6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -126,6 +126,8 @@ public class EventRecorder {
         Cleanup,
         CleanupFailed,
         Missing,
-        ValidationError
+        ValidationError,
+        RecoverDeployment,
+        RestartUnhealthyJob
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 0eebdcd3..2d8d99b0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -44,7 +44,10 @@ import 
org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
+import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointStatus;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -72,6 +75,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.MSG_SUBMIT;
+import static 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RECOVERY;
+import static 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RESTART_UNHEALTHY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -90,10 +97,12 @@ public class ApplicationReconcilerTest {
     private Context<FlinkDeployment> context;
     private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
 
+    private EventCollector eventCollector = new EventCollector();
+
     @BeforeEach
     public void before() {
         
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
-        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        var eventRecorder = new EventRecorder(kubernetesClient, 
eventCollector);
         statusRecorder = new TestingStatusRecorder<FlinkDeployment, 
FlinkDeploymentStatus>();
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
@@ -664,4 +673,39 @@ public class ApplicationReconcilerTest {
         reconciler.reconcile(deployment, context);
         assertEquals(JobManagerDeploymentStatus.MISSING, 
status.getJobManagerDeploymentStatus());
     }
+
+    @Test
+    public void testDeploymentRecoveryEvent() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        reconciler.reconcile(deployment, context);
+        Assertions.assertEquals(MSG_SUBMIT, 
eventCollector.events.remove().getMessage());
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        flinkService.clear();
+        FlinkDeploymentStatus deploymentStatus = deployment.getStatus();
+        
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        deploymentStatus
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+        reconciler.reconcile(deployment, context);
+        Assertions.assertEquals(MSG_RECOVERY, 
eventCollector.events.remove().getMessage());
+    }
+
+    @Test
+    public void testRestartUnhealthyEvent() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true");
+        reconciler.reconcile(deployment, context);
+        Assertions.assertEquals(MSG_SUBMIT, 
eventCollector.events.remove().getMessage());
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        var clusterHealthInfo = new 
ClusterHealthInfo(System.currentTimeMillis(), 2, false);
+        ClusterHealthEvaluator.setLastValidClusterHealthInfo(
+                deployment.getStatus().getClusterInfo(), clusterHealthInfo);
+        reconciler.reconcile(deployment, context);
+        Assertions.assertEquals(MSG_RESTART_UNHEALTHY, 
eventCollector.events.remove().getMessage());
+    }
 }

Reply via email to