This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 49debb1  [FLINK-26830] Transition to SUSPEND before redeploying on job 
upgrades
49debb1 is described below

commit 49debb14ef530cae79d5d75f81029629a6afed4c
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Mar 23 15:31:06 2022 +0100

    [FLINK-26830] Transition to SUSPEND before redeploying on job upgrades
---
 .../controller/FlinkDeploymentController.java      | 12 ++----
 .../operator/reconciler/JobReconciler.java         | 32 ++++++---------
 .../operator/reconciler/ReconciliationUtils.java   | 41 +++++++++++++++++--
 .../operator/reconciler/SessionReconciler.java     |  2 +-
 .../controller/FlinkDeploymentControllerTest.java  | 26 +++++++++++-
 .../operator/reconciler/JobReconcilerTest.java     | 10 +++++
 .../operator/utils/ReconciliationUtilsTest.java    | 46 ++++++++++++++++------
 7 files changed, 125 insertions(+), 44 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index aad86a1..6e4e587 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -47,7 +47,6 @@ import 
io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -112,7 +111,8 @@ public class FlinkDeploymentController
         if (validationError.isPresent()) {
             LOG.error("Validation failed: " + validationError.get());
             ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
-            return ReconciliationUtils.toUpdateControl(originalCopy, flinkApp);
+            return ReconciliationUtils.toUpdateControl(
+                    operatorConfiguration, originalCopy, flinkApp, false);
         }
 
         Configuration effectiveConfig =
@@ -127,12 +127,8 @@ public class FlinkDeploymentController
             throw new ReconciliationException(e);
         }
 
-        Duration rescheduleAfter =
-                flinkApp.getStatus()
-                        .getJobManagerDeploymentStatus()
-                        .rescheduleAfter(flinkApp, operatorConfiguration);
-        return ReconciliationUtils.toUpdateControl(originalCopy, flinkApp)
-                .rescheduleAfter(rescheduleAfter.toMillis());
+        return ReconciliationUtils.toUpdateControl(
+                operatorConfiguration, originalCopy, flinkApp, true);
     }
 
     private void handleDeploymentFailed(FlinkDeployment flinkApp, 
DeploymentFailedException dfe) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 0eb7608..608ebeb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -72,7 +72,7 @@ public class JobReconciler extends BaseReconciler {
                     effectiveConfig,
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(flinkApp, effectiveConfig, 
kubernetesClient);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
JobState.RUNNING);
             return;
         }
 
@@ -87,14 +87,14 @@ public class JobReconciler extends BaseReconciler {
             JobState desiredJobState = jobSpec.getState();
 
             UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
+            JobState stateAfterReconcile = currentJobState;
             if (currentJobState == JobState.RUNNING) {
                 if (desiredJobState == JobState.RUNNING) {
-                    upgradeFlinkJob(flinkApp, effectiveConfig);
-                }
-                if (desiredJobState.equals(JobState.SUSPENDED)) {
-                    printCancelLogs(upgradeMode, 
flinkApp.getMetadata().getName());
-                    suspendJob(flinkApp, upgradeMode, effectiveConfig);
+                    LOG.info("Upgrading running job, suspending first...");
                 }
+                printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+                suspendJob(flinkApp, upgradeMode, effectiveConfig);
+                stateAfterReconcile = JobState.SUSPENDED;
             }
             if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING) {
                 if (upgradeMode == UpgradeMode.STATELESS) {
@@ -103,9 +103,10 @@ public class JobReconciler extends BaseReconciler {
                         || upgradeMode == UpgradeMode.SAVEPOINT) {
                     restoreFromLastSavepoint(flinkApp, effectiveConfig);
                 }
+                stateAfterReconcile = JobState.RUNNING;
             }
             IngressUtils.updateIngressRules(flinkApp, effectiveConfig, 
kubernetesClient);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
stateAfterReconcile);
         } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp) && 
isJobRunning(flinkApp)) {
             triggerSavepoint(flinkApp, effectiveConfig);
             ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
@@ -144,21 +145,14 @@ public class JobReconciler extends BaseReconciler {
         
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
     }
 
-    private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration 
effectiveConfig)
-            throws Exception {
-        LOG.info("Upgrading running job");
-        final Optional<String> savepoint =
-                suspendJob(flinkApp, 
flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
-        deployFlinkJob(flinkApp, effectiveConfig, savepoint);
-    }
-
     private void restoreFromLastSavepoint(FlinkDeployment flinkApp, 
Configuration effectiveConfig)
             throws Exception {
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        deployFlinkJob(
-                flinkApp,
-                effectiveConfig,
-                
Optional.of(jobStatus.getSavepointInfo().getLastSavepoint().getLocation()));
+        Optional<String> savepointOpt =
+                
Optional.ofNullable(jobStatus.getSavepointInfo().getLastSavepoint())
+                        .flatMap(s -> Optional.ofNullable(s.getLocation()));
+
+        deployFlinkJob(flinkApp, effectiveConfig, savepointOpt);
     }
 
     private void printCancelLogs(UpgradeMode upgradeMode, String name) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index f7a19c5..219ebbd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,14 +17,17 @@
 
 package org.apache.flink.kubernetes.operator.reconciler;
 
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 
+import java.time.Duration;
 import java.util.Objects;
 
 /** Reconciliation utilities. */
@@ -32,7 +35,8 @@ public class ReconciliationUtils {
 
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    public static void updateForSpecReconciliationSuccess(FlinkDeployment 
flinkApp) {
+    public static void updateForSpecReconciliationSuccess(
+            FlinkDeployment flinkApp, JobState stateAfterReconcile) {
         ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
         reconciliationStatus.setSuccess(true);
         reconciliationStatus.setError(null);
@@ -45,6 +49,7 @@ public class ReconciliationUtils {
                             .getJob()
                             .getSavepointTriggerNonce();
             
clonedSpec.getJob().setSavepointTriggerNonce(oldSavepointTriggerNonce);
+            clonedSpec.getJob().setState(stateAfterReconcile);
         }
         reconciliationStatus.setLastReconciledSpec(clonedSpec);
     }
@@ -79,7 +84,10 @@ public class ReconciliationUtils {
     }
 
     public static UpdateControl<FlinkDeployment> toUpdateControl(
-            FlinkDeployment originalCopy, FlinkDeployment current) {
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkDeployment originalCopy,
+            FlinkDeployment current,
+            boolean reschedule) {
         UpdateControl<FlinkDeployment> updateControl;
         if (!Objects.equals(originalCopy.getSpec(), current.getSpec())) {
             throw new UnsupportedOperationException(
@@ -93,6 +101,33 @@ public class ReconciliationUtils {
         } else {
             updateControl = UpdateControl.noUpdate();
         }
-        return updateControl;
+
+        if (!reschedule) {
+            return updateControl;
+        }
+
+        if (isJobUpgradeInProgress(current)) {
+            return updateControl.rescheduleAfter(0);
+        }
+
+        Duration rescheduleAfter =
+                current.getStatus()
+                        .getJobManagerDeploymentStatus()
+                        .rescheduleAfter(current, operatorConfiguration);
+
+        return updateControl.rescheduleAfter(rescheduleAfter.toMillis());
+    }
+
+    private static boolean isJobUpgradeInProgress(FlinkDeployment current) {
+        ReconciliationStatus reconciliationStatus = 
current.getStatus().getReconciliationStatus();
+
+        if (reconciliationStatus == null || current.getSpec().getJob() == 
null) {
+            return false;
+        }
+
+        return current.getSpec().getJob().getState() == JobState.RUNNING
+                && reconciliationStatus.isSuccess()
+                && 
reconciliationStatus.getLastReconciledSpec().getJob().getState()
+                        == JobState.SUSPENDED;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 75ce176..7192b12 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -64,7 +64,7 @@ public class SessionReconciler extends BaseReconciler {
             upgradeSessionCluster(flinkApp, effectiveConfig);
             IngressUtils.updateIngressRules(flinkApp, effectiveConfig, 
kubernetesClient);
         }
-        ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, null);
     }
 
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration 
effectiveConfig)
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 96eedd0..e984bf6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -283,7 +283,17 @@ public class FlinkDeploymentControllerTest {
         appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
 
-        testController.reconcile(appCluster, context);
+        assertEquals(0, testController.reconcile(appCluster, 
context).getScheduleDelay().get());
+        assertEquals(
+                JobState.SUSPENDED,
+                appCluster
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getJob()
+                        .getState());
+
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("savepoint_0", jobs.get(0).f0);
@@ -331,11 +341,23 @@ public class FlinkDeploymentControllerTest {
 
         UpdateControl<FlinkDeployment> updateControl =
                 testController.reconcile(appCluster, context);
+        assertEquals(0, updateControl.getScheduleDelay().get());
+        assertEquals(
+                JobState.SUSPENDED,
+                appCluster
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getJob()
+                        .getState());
+
+        updateControl = testController.reconcile(appCluster, context);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING
                         .rescheduleAfter(appCluster, operatorConfiguration)
                         .toMillis(),
                 updateControl.getScheduleDelay().get());
+
         testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
@@ -404,7 +426,6 @@ public class FlinkDeploymentControllerTest {
             flinkService.setPortReady(true);
             testController.reconcile(appCluster, context);
             testController.reconcile(appCluster, context);
-
             if (appCluster.getSpec().getJob() != null) {
                 assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
             } else {
@@ -424,6 +445,7 @@ public class FlinkDeploymentControllerTest {
             flinkService.setPortReady(true);
             testController.reconcile(appCluster, context);
             testController.reconcile(appCluster, context);
+            testController.reconcile(appCluster, 
TestUtils.createEmptyContext());
 
             assertEquals(
                     JobManagerDeploymentStatus.DEPLOYING,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 691ad67..d201961 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -67,6 +67,11 @@ public class JobReconcilerTest {
         reconciler.reconcile(statelessUpgrade, context, config);
 
         runningJobs = flinkService.listJobs();
+        assertEquals(0, runningJobs.size());
+
+        reconciler.reconcile(statelessUpgrade, context, config);
+
+        runningJobs = flinkService.listJobs();
         assertEquals(1, runningJobs.size());
         assertNull(runningJobs.get(0).f0);
 
@@ -83,6 +88,11 @@ public class JobReconcilerTest {
         reconciler.reconcile(statefulUpgrade, context, new 
Configuration(config));
 
         runningJobs = flinkService.listJobs();
+        assertEquals(0, runningJobs.size());
+
+        reconciler.reconcile(statefulUpgrade, context, new 
Configuration(config));
+
+        runningJobs = flinkService.listJobs();
         assertEquals(1, runningJobs.size());
         assertEquals("savepoint_0", runningJobs.get(0).f0);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 1558e12..76bd26a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -17,16 +17,17 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Test;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,26 +36,30 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Test for {@link 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
 public class ReconciliationUtilsTest {
 
+    FlinkOperatorConfiguration operatorConfiguration =
+            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+
     @Test
     public void testSpecChangedException() {
         FlinkDeployment previous = TestUtils.buildApplicationCluster();
-        FlinkDeployment current =
-                
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.clone(previous);
+        FlinkDeployment current = ReconciliationUtils.clone(previous);
 
         current.getSpec().setImage("changed-image");
         assertThrows(
                 UnsupportedOperationException.class,
-                () -> ReconciliationUtils.toUpdateControl(previous, current));
+                () ->
+                        ReconciliationUtils.toUpdateControl(
+                                operatorConfiguration, previous, current, 
true));
     }
 
     @Test
     public void testStatusChanged() {
         FlinkDeployment previous = TestUtils.buildApplicationCluster();
-        FlinkDeployment current =
-                
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.clone(previous);
+        FlinkDeployment current = ReconciliationUtils.clone(previous);
 
         UpdateControl<FlinkDeployment> updateControl =
-                ReconciliationUtils.toUpdateControl(previous, current);
+                ReconciliationUtils.toUpdateControl(
+                        operatorConfiguration, previous, current, false);
 
         assertFalse(updateControl.isUpdateResource());
         assertFalse(updateControl.isUpdateStatus());
@@ -63,10 +68,29 @@ public class ReconciliationUtilsTest {
         // status changed
         
current.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
         updateControl =
-                ReconciliationUtils.toUpdateControl(previous, current)
-                        .rescheduleAfter(10, TimeUnit.MILLISECONDS);
+                ReconciliationUtils.toUpdateControl(operatorConfiguration, 
previous, current, true);
+        assertFalse(updateControl.isUpdateResource());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                operatorConfiguration.getProgressCheckInterval().toMillis(),
+                updateControl.getScheduleDelay().get());
+    }
+
+    @Test
+    public void testRescheduleUpgradeImmediately() {
+        FlinkDeployment app = TestUtils.buildApplicationCluster();
+        app.getSpec().getJob().setState(JobState.RUNNING);
+        FlinkDeployment current = ReconciliationUtils.clone(app);
+        current.getStatus()
+                .getReconciliationStatus()
+                
.setLastReconciledSpec(ReconciliationUtils.clone(current.getSpec()));
+        ReconciliationUtils.updateForSpecReconciliationSuccess(current, 
JobState.SUSPENDED);
+
+        UpdateControl<FlinkDeployment> updateControl =
+                ReconciliationUtils.toUpdateControl(operatorConfiguration, 
app, current, true);
+
         assertFalse(updateControl.isUpdateResource());
         assertTrue(updateControl.isUpdateStatus());
-        assertEquals(10, updateControl.getScheduleDelay().get());
+        assertEquals(0, updateControl.getScheduleDelay().get());
     }
 }

Reply via email to