This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 8a30e9bd770b32e28d2a7c0fe1830f5f6d9ab090
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Feb 28 18:42:43 2023 +0100

    [FLINK-31277] Fix deployment recovery logic HA meta check
---
 .../AbstractFlinkResourceReconciler.java           |  5 ++-
 .../deployment/ApplicationReconciler.java          |  8 ++--
 .../controller/DeploymentRecoveryTest.java         | 44 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 540884b8..2098c07c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -428,7 +428,10 @@ public abstract class AbstractFlinkResourceReconciler<
 
             if (jmMissingForRunningDeployment(deployment)) {
                 LOG.debug("Jobmanager deployment is missing, trying to 
recover");
-                if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+                var jobSpec = deployment.getSpec().getJob();
+                boolean stateless =
+                        jobSpec != null && jobSpec.getUpgradeMode() == 
UpgradeMode.STATELESS;
+                if (stateless || 
HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
                     LOG.debug("HA is enabled, recovering lost jobmanager 
deployment");
                     result = true;
                 } else {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index f68960f9..c6111c8b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -280,10 +280,10 @@ public class ApplicationReconciler
                         MSG_RESTART_UNHEALTHY);
                 cleanupAfterFailedJob(ctx);
             }
-            boolean requireHaMetadata =
-                    
ReconciliationUtils.getDeployedSpec(ctx.getResource()).getJob().getUpgradeMode()
-                            != UpgradeMode.STATELESS;
-            resubmitJob(ctx, requireHaMetadata);
+
+            resubmitJob(
+                    ctx,
+                    
HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig()));
             return true;
         }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 22b893db..1030e461 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -163,4 +164,47 @@ public class DeploymentRecoveryTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
     }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
+    public void verifyRecoveryWithoutHaData(FlinkVersion flinkVersion, 
UpgradeMode upgradeMode)
+            throws Exception {
+        FlinkDeployment appCluster = 
TestUtils.buildApplicationCluster(flinkVersion);
+        appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+        // We disable HA for stateless to test recovery without HA metadata
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            appCluster
+                    .getSpec()
+                    .getFlinkConfiguration()
+                    .put(HighAvailabilityOptions.HA_MODE.key(), "none");
+        }
+
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Remove deployment
+        flinkService.setPortReady(false);
+        flinkService.clear();
+
+        // Set HA metadata not available
+        flinkService.setHaDataAvailable(false);
+
+        testController.reconcile(appCluster, context);
+
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYING,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+        } else {
+            assertEquals(
+                    JobManagerDeploymentStatus.MISSING,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+        }
+    }
 }

Reply via email to