This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 824d96fa [FLINK-38077] Make sure jobmanager is accessible when trying 
to cancel for suspend/upgrade
824d96fa is described below

commit 824d96facaafd61d15b92ae1bf80ab8e76b80a85
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jul 10 12:46:06 2025 +0200

    [FLINK-38077] Make sure jobmanager is accessible when trying to cancel for 
suspend/upgrade
---
 .../operator/api/status/CommonStatus.java          | 12 ++++++
 .../operator/api/status/FlinkDeploymentStatus.java |  7 ++++
 .../api/status/JobManagerDeploymentStatus.java     |  4 ++
 .../operator/reconciler/ReconciliationUtils.java   |  4 --
 .../deployment/AbstractJobReconciler.java          |  4 +-
 .../deployment/ApplicationReconciler.java          |  7 ++--
 .../operator/service/AbstractFlinkService.java     |  2 +-
 .../ApplicationReconcilerUpgradeModeTest.java      | 45 ++++++++++++++++------
 8 files changed, 63 insertions(+), 22 deletions(-)

diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 30649e89..0de792da 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import io.fabric8.crd.generator.annotation.PrinterColumn;
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -29,6 +30,8 @@ import lombok.NoArgsConstructor;
 import lombok.experimental.SuperBuilder;
 import org.apache.commons.lang3.StringUtils;
 
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+
 /** Last observed common status of the Flink deployment/Flink SessionJob. */
 @Experimental
 @Data
@@ -121,4 +124,13 @@ public abstract class CommonStatus<SPEC extends 
AbstractFlinkSpec> {
 
         return ResourceLifecycleState.DEPLOYED;
     }
+
+    @JsonIgnore
+    public boolean isJobCancellable() {
+        var jobState = jobStatus.getState();
+        if (jobState == null) {
+            return false;
+        }
+        return RECONCILING != jobState && !jobState.isGloballyTerminalState();
+    }
 }
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
index 136d3415..9a341299 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.api.status;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -55,4 +56,10 @@ public class FlinkDeploymentStatus extends 
CommonStatus<FlinkDeploymentSpec> {
 
     /** Information about the TaskManagers for the scale subresource. */
     private TaskManagerInfo taskManager;
+
+    @JsonIgnore
+    @Override
+    public boolean isJobCancellable() {
+        return super.isJobCancellable() && 
jobManagerDeploymentStatus.isRestApiAvailable();
+    }
 }
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
index 54a0181b..2c86c49a 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
@@ -35,4 +35,8 @@ public enum JobManagerDeploymentStatus {
 
     /** Deployment in terminal error, requires spec change for reconciliation 
to continue. */
     ERROR;
+
+    public boolean isRestApiAvailable() {
+        return this == READY;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 2936501a..94140c3f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -379,10 +379,6 @@ public class ReconciliationUtils {
         return CANCELED == status.getJobStatus().getState();
     }
 
-    public static boolean isJobCancellable(CommonStatus<?> status) {
-        return RECONCILING != status.getJobStatus().getState();
-    }
-
     public static boolean isJobCancelling(CommonStatus<?> status) {
         return status.getJobStatus() != null && CANCELLING == 
status.getJobStatus().getState();
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 3fcf8e46..9a4480d0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -277,7 +277,7 @@ public abstract class AbstractJobReconciler<
                 if (running && savepointPossible) {
                     LOG.info("Using savepoint to upgrade Flink version");
                     return JobUpgrade.savepoint(false);
-                } else if 
(ReconciliationUtils.isJobCancellable(resource.getStatus())) {
+                } else if (resource.getStatus().isJobCancellable()) {
                     LOG.info("Using last-state upgrade with cancellation to 
upgrade Flink version");
                     return JobUpgrade.lastStateUsingCancel();
                 } else {
@@ -354,7 +354,7 @@ public abstract class AbstractJobReconciler<
 
     private boolean allowLastStateCancel(FlinkResourceContext<CR> ctx) {
         var resource = ctx.getResource();
-        if (!ReconciliationUtils.isJobCancellable(resource.getStatus())) {
+        if (!resource.getStatus().isJobCancellable()) {
             return false;
         }
         if (resource instanceof FlinkSessionJob) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 180fb128..ce8baca5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -88,12 +88,13 @@ public class ApplicationReconciler
         var status = deployment.getStatus();
         var availableUpgradeMode = super.getJobUpgrade(ctx, deployConfig);
 
-        if (availableUpgradeMode.isAvailable() || 
!availableUpgradeMode.isAllowFallback()) {
+        if (availableUpgradeMode.isAvailable()) {
             return availableUpgradeMode;
         }
         var flinkService = ctx.getFlinkService();
 
-        if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
+        if (availableUpgradeMode.isAllowFallback()
+                && 
HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
                 && 
HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
                 && flinkService.isHaMetadataAvailable(deployConfig)) {
             LOG.info(
@@ -125,7 +126,7 @@ public class ApplicationReconciler
                     "UpgradeFailed");
         }
 
-        return JobUpgrade.unavailable();
+        return availableUpgradeMode;
     }
 
     private void deleteJmThatNeverStarted(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index f0a7b25b..18fc3690 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -336,7 +336,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     savepointPath = savepointJobOrError(clusterClient, status, 
conf);
                     break;
                 case STATELESS:
-                    if (ReconciliationUtils.isJobCancellable(status)) {
+                    if (status.isJobCancellable()) {
                         try {
                             cancelJobOrError(clusterClient, status, true);
                         } catch (Exception ex) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 3b50af2a..c9f70586 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -72,6 +72,7 @@ import static 
org.apache.flink.api.common.JobStatus.RESTARTING;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -501,6 +502,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
     @ValueSource(booleans = {true, false})
     public void testLastStateMaxCheckpointAge(boolean cancellable) throws 
Exception {
         var deployment = TestUtils.buildApplicationCluster();
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         deployment
                 .getSpec()
                 .getFlinkConfiguration()
@@ -643,6 +645,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         jobStatus.setJobId(new JobID().toString());
 
         // Running state, savepoint if possible
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         jobStatus.setState(RUNNING);
         var ctx = getResourceContext(deployment);
         var deployConf = ctx.getDeployConfig(deployment.getSpec());
@@ -654,6 +657,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                 jobReconciler.getJobUpgrade(ctx, deployConf));
 
         // Not running (but cancellable)
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         jobStatus.setState(RESTARTING);
         assertEquals(
                 AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
@@ -667,17 +671,26 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
     }
 
     private static Stream<Arguments> testLastStateCancelParams() {
-        return Stream.of(
-                Arguments.of(UpgradeMode.LAST_STATE, true),
-                Arguments.of(UpgradeMode.LAST_STATE, false),
-                Arguments.of(UpgradeMode.SAVEPOINT, true),
-                Arguments.of(UpgradeMode.SAVEPOINT, false));
+        var out = new ArrayList<Arguments>();
+        for (var upgradeMode : List.of(UpgradeMode.SAVEPOINT, 
UpgradeMode.LAST_STATE)) {
+            for (boolean allowFallback : List.of(true, false)) {
+                for (var jmStatus : JobManagerDeploymentStatus.values()) {
+                    out.add(Arguments.of(upgradeMode, allowFallback, 
jmStatus));
+                }
+            }
+        }
+        return out.stream();
     }
 
     @ParameterizedTest
     @MethodSource("testLastStateCancelParams")
-    public void testLastStateNoHaMeta(UpgradeMode upgradeMode, boolean 
allowFallback)
+    public void testLastStateNoHaMeta(
+            UpgradeMode upgradeMode, boolean allowFallback, 
JobManagerDeploymentStatus jmStatus)
             throws Exception {
+        if (upgradeMode == UpgradeMode.LAST_STATE && !allowFallback) {
+            // This cannot happen
+            return;
+        }
         var jobReconciler = (ApplicationReconciler) 
this.reconciler.getReconciler();
         var deployment = TestUtils.buildApplicationCluster();
         deployment
@@ -702,6 +715,8 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
 
         // Set job status to running
         var jobStatus = deployment.getStatus().getJobStatus();
+        deployment.getStatus().setJobManagerDeploymentStatus(jmStatus);
+
         long now = System.currentTimeMillis();
 
         jobStatus.setStartTime(Long.toString(now));
@@ -712,15 +727,21 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         var ctx = getResourceContext(deployment);
         var deployConf = ctx.getDeployConfig(deployment.getSpec());
 
-        if (upgradeMode == UpgradeMode.LAST_STATE) {
-            assertEquals(
-                    AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
-                    jobReconciler.getJobUpgrade(ctx, deployConf));
+        if (List.of(JobManagerDeploymentStatus.ERROR, 
JobManagerDeploymentStatus.MISSING)
+                .contains(jmStatus)) {
+            assertThatThrownBy(() -> jobReconciler.getJobUpgrade(ctx, 
deployConf))
+                    .isInstanceOf(UpgradeFailureException.class)
+                    .hasMessageContaining(
+                            "JobManager deployment is missing and HA metadata 
is not available");
         } else {
+            boolean immediatelyCancellable =
+                    (upgradeMode == UpgradeMode.LAST_STATE || allowFallback)
+                            && jmStatus == JobManagerDeploymentStatus.READY;
             assertEquals(
-                    allowFallback
+                    immediatelyCancellable
                             ? 
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel()
-                            : 
AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
+                            : new AbstractJobReconciler.JobUpgrade(
+                                    null, null, false, allowFallback, true),
                     jobReconciler.getJobUpgrade(ctx, deployConf));
         }
     }

Reply via email to