This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 8a30e9bd770b32e28d2a7c0fe1830f5f6d9ab090 Author: Gyula Fora <[email protected]> AuthorDate: Tue Feb 28 18:42:43 2023 +0100 [FLINK-31277] Fix deployment recovery logic HA meta check --- .../AbstractFlinkResourceReconciler.java | 5 ++- .../deployment/ApplicationReconciler.java | 8 ++-- .../controller/DeploymentRecoveryTest.java | 44 ++++++++++++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 540884b8..2098c07c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -428,7 +428,10 @@ public abstract class AbstractFlinkResourceReconciler< if (jmMissingForRunningDeployment(deployment)) { LOG.debug("Jobmanager deployment is missing, trying to recover"); - if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) { + var jobSpec = deployment.getSpec().getJob(); + boolean stateless = + jobSpec != null && jobSpec.getUpgradeMode() == UpgradeMode.STATELESS; + if (stateless || HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) { LOG.debug("HA is enabled, recovering lost jobmanager deployment"); result = true; } else { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index f68960f9..c6111c8b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -280,10 +280,10 @@ public class ApplicationReconciler MSG_RESTART_UNHEALTHY); cleanupAfterFailedJob(ctx); } - boolean requireHaMetadata = - ReconciliationUtils.getDeployedSpec(ctx.getResource()).getJob().getUpgradeMode() - != UpgradeMode.STATELESS; - resubmitJob(ctx, requireHaMetadata); + + resubmitJob( + ctx, + HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())); return true; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java index 22b893db..1030e461 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -163,4 +164,47 @@ public class DeploymentRecoveryTest { JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); } + + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes") + public void verifyRecoveryWithoutHaData(FlinkVersion flinkVersion, UpgradeMode upgradeMode) + throws Exception { + FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion); + appCluster.getSpec().getJob().setUpgradeMode(upgradeMode); + + // We disable HA for stateless to test recovery without HA metadata + if (upgradeMode == UpgradeMode.STATELESS) { + appCluster + .getSpec() + .getFlinkConfiguration() + .put(HighAvailabilityOptions.HA_MODE.key(), "none"); + } + + testController.reconcile(appCluster, context); + testController.reconcile(appCluster, context); + testController.reconcile(appCluster, context); + + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + + // Remove deployment + flinkService.setPortReady(false); + flinkService.clear(); + + // Set HA metadata not available + flinkService.setHaDataAvailable(false); + + testController.reconcile(appCluster, context); + + if (upgradeMode == UpgradeMode.STATELESS) { + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + appCluster.getStatus().getJobManagerDeploymentStatus()); + } else { + assertEquals( + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); + } + } }
