This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 18903878 [FLINK-32551] Add option to take a savepoint on 
flinkdeployment/flinksessionjob deletion
18903878 is described below

commit 18903878cbeff57fa1ad10ccd66cec3d68de7aa2
Author: oleksandr.nitavskyi <[email protected]>
AuthorDate: Wed Jul 19 14:45:10 2023 +0200

    [FLINK-32551] Add option to take a savepoint on 
flinkdeployment/flinksessionjob deletion
---
 .../shortcodes/generated/dynamic_section.html      |  6 +++++
 .../kubernetes_operator_config_configuration.html  |  6 +++++
 .../flink/kubernetes/operator/FlinkOperator.java   |  3 +--
 .../config/FlinkOperatorConfiguration.java         |  7 +++++-
 .../config/KubernetesOperatorConfigOptions.java    |  8 ++++++
 .../deployment/ApplicationReconciler.java          |  7 ++++--
 .../reconciler/deployment/ReconcilerFactory.java   |  5 +---
 .../reconciler/deployment/SessionReconciler.java   |  6 +----
 .../sessionjob/SessionJobReconciler.java           | 13 +++++-----
 .../kubernetes/operator/OperatorTestBase.java      |  3 ++-
 .../TestingFlinkSessionJobController.java          |  3 +--
 .../sessionjob/FlinkSessionJobObserverTest.java    |  3 +--
 .../deployment/ApplicationReconcilerTest.java      | 27 ++++++++++++++++++++
 .../deployment/SessionReconcilerTest.java          |  3 +--
 .../sessionjob/SessionJobReconcilerTest.java       | 29 ++++++++++++++++++++--
 15 files changed, 99 insertions(+), 30 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 45ecc3f3..7faf191c 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -80,6 +80,12 @@
             <td>Boolean</td>
             <td>Whether to restart failed jobs.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.savepoint-on-deletion</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Indicate whether a savepoint must be taken when deleting a 
FlinkDeployment or FlinkSessionJob.</td>
+        </tr>
         <tr>
             
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index b3604b80..b6851ad9 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -164,6 +164,12 @@
             <td>Boolean</td>
             <td>Whether to restart failed jobs.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.savepoint-on-deletion</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Indicate whether a savepoint must be taken when deleting a 
FlinkDeployment or FlinkSessionJob.</td>
+        </tr>
         <tr>
             
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 80b0efd2..80ae70bb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -187,8 +187,7 @@ public class FlinkOperator {
         var metricManager =
                 MetricManager.createFlinkSessionJobMetricManager(baseConfig, 
metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
-        var reconciler =
-                new SessionJobReconciler(client, eventRecorder, 
statusRecorder, configManager);
+        var reconciler = new SessionJobReconciler(client, eventRecorder, 
statusRecorder);
         var observer = new FlinkSessionJobObserver(eventRecorder);
         var canaryResourceManager =
                 new CanaryResourceManager<FlinkSessionJob>(configManager, 
client);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index d4ed1e79..40a6dcb5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -71,6 +71,7 @@ public class FlinkOperatorConfiguration {
     String labelSelector;
     LeaderElectionConfiguration leaderElectionConfiguration;
     DeletionPropagation deletionPropagation;
+    boolean savepointOnDeletion;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         Duration reconcileInterval =
@@ -181,6 +182,9 @@ public class FlinkOperatorConfiguration {
         DeletionPropagation deletionPropagation =
                 
operatorConfig.get(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION);
 
+        boolean savepointOnDeletion =
+                
operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION);
+
         return new FlinkOperatorConfiguration(
                 reconcileInterval,
                 reconcilerMaxParallelism,
@@ -207,7 +211,8 @@ public class FlinkOperatorConfiguration {
                 exceptionLabelMapper,
                 labelSelector,
                 getLeaderElectionConfig(operatorConfig),
-                deletionPropagation);
+                deletionPropagation,
+                savepointOnDeletion);
     }
 
     private static LeaderElectionConfiguration 
getLeaderElectionConfig(Configuration conf) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index afd391aa..d1fb4422 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -525,4 +525,12 @@ public class KubernetesOperatorConfigOptions {
                     .enumType(DeletionPropagation.class)
                     .defaultValue(DeletionPropagation.FOREGROUND)
                     .withDescription("JM/TM Deployment deletion propagation.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> SAVEPOINT_ON_DELETION =
+            operatorConfig("job.savepoint-on-deletion")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Indicate whether a savepoint must be taken when 
deleting a FlinkDeployment or FlinkSessionJob.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 346accaa..95b69f22 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -362,8 +362,11 @@ public class ApplicationReconciler
             ctx.getFlinkService()
                     .deleteClusterDeployment(deployment.getMetadata(), status, 
conf, true);
         } else {
-            ctx.getFlinkService()
-                    .cancelJob(deployment, UpgradeMode.STATELESS, 
ctx.getObserveConfig());
+            UpgradeMode upgradeMode =
+                    ctx.getOperatorConfig().isSavepointOnDeletion()
+                            ? UpgradeMode.SAVEPOINT
+                            : UpgradeMode.STATELESS;
+            cancelJob(ctx, upgradeMode);
         }
         return DeleteControl.defaultDelete();
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 655b3e2f..d1b3863e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -66,10 +66,7 @@ public class ReconcilerFactory {
                     switch (modes.f0) {
                         case SESSION:
                             return new SessionReconciler(
-                                    kubernetesClient,
-                                    eventRecorder,
-                                    deploymentStatusRecorder,
-                                    configManager);
+                                    kubernetesClient, eventRecorder, 
deploymentStatusRecorder);
                         case APPLICATION:
                             return new ApplicationReconciler(
                                     kubernetesClient,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 6cac4533..0154e639 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -48,15 +47,12 @@ public class SessionReconciler
                 FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SessionReconciler.class);
-    private final FlinkConfigManager configManager;
 
     public SessionReconciler(
             KubernetesClient kubernetesClient,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
-            FlinkConfigManager configManager) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder) {
         super(kubernetesClient, eventRecorder, statusRecorder, new 
NoopJobAutoscalerFactory());
-        this.configManager = configManager;
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 47a72d09..701608b5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
@@ -47,15 +46,11 @@ public class SessionJobReconciler
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SessionJobReconciler.class);
 
-    private final FlinkConfigManager configManager;
-
     public SessionJobReconciler(
             KubernetesClient kubernetesClient,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder,
-            FlinkConfigManager configManager) {
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder) {
         super(kubernetesClient, eventRecorder, statusRecorder, new 
NoopJobAutoscalerFactory());
-        this.configManager = configManager;
     }
 
     @Override
@@ -107,7 +102,11 @@ public class SessionJobReconciler
             String jobID = 
ctx.getResource().getStatus().getJobStatus().getJobId();
             if (jobID != null) {
                 try {
-                    cancelJob(ctx, UpgradeMode.STATELESS);
+                    UpgradeMode upgradeMode =
+                            ctx.getOperatorConfig().isSavepointOnDeletion()
+                                    ? UpgradeMode.SAVEPOINT
+                                    : UpgradeMode.STATELESS;
+                    cancelJob(ctx, upgradeMode);
                 } catch (ExecutionException e) {
                     final var cause = e.getCause();
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
index 592f15b9..8cdd3250 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
@@ -33,7 +33,8 @@ import org.junit.jupiter.api.BeforeEach;
 /** @link JobStatusObserver unit tests */
 public abstract class OperatorTestBase {
 
-    protected FlinkConfigManager configManager = new FlinkConfigManager(new 
Configuration());
+    protected Configuration conf = new Configuration();
+    protected FlinkConfigManager configManager = new FlinkConfigManager(conf);
     protected TestingFlinkService flinkService;
     protected EventCollector eventCollector = new EventCollector();
     protected EventRecorder eventRecorder;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index b6ea2c2b..bdc45034 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -95,8 +95,7 @@ public class TestingFlinkSessionJobController
                 new FlinkSessionJobController(
                         ValidatorUtils.discoverValidators(configManager),
                         ctxFactory,
-                        new SessionJobReconciler(
-                                kubernetesClient, eventRecorder, 
statusRecorder, configManager),
+                        new SessionJobReconciler(kubernetesClient, 
eventRecorder, statusRecorder),
                         new FlinkSessionJobObserver(eventRecorder),
                         statusRecorder,
                         eventRecorder,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index a18a05fd..07054d6e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -66,8 +66,7 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         reconciler =
                 new TestReconcilerAdapter<>(
                         this,
-                        new SessionJobReconciler(
-                                kubernetesClient, eventRecorder, 
statusRecorder, configManager));
+                        new SessionJobReconciler(kubernetesClient, 
eventRecorder, statusRecorder));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index f79dd596..e9f50e63 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -121,6 +121,33 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         executorService = Executors.newDirectExecutorService();
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion) 
throws Exception {
+        var conf = configManager.getDefaultConfig();
+        conf.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
+        configManager.updateDefaultConfig(conf);
+
+        FlinkDeployment deployment = 
TestUtils.buildApplicationCluster(flinkVersion);
+
+        // session ready
+        reconciler.reconcile(deployment, 
TestUtils.createContextWithReadyFlinkDeployment());
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        // clean up
+        assertEquals(
+                null, 
deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+        reconciler.cleanup(deployment, 
TestUtils.createContextWithReadyFlinkDeployment());
+        assertEquals(
+                "savepoint_0",
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
+    }
+
     @ParameterizedTest
     
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index a3af075a..a087e6e2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -61,8 +61,7 @@ public class SessionReconcilerTest extends OperatorTestBase {
         reconciler =
                 new TestReconcilerAdapter<>(
                         this,
-                        new SessionReconciler(
-                                kubernetesClient, eventRecorder, 
statusRecorder, configManager));
+                        new SessionReconciler(kubernetesClient, eventRecorder, 
statusRecorder));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index bfc00846..36569904 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -87,8 +87,33 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         reconciler =
                 new TestReconcilerAdapter<>(
                         this,
-                        new SessionJobReconciler(
-                                kubernetesClient, eventRecorder, 
statusRecorder, configManager));
+                        new SessionJobReconciler(kubernetesClient, 
eventRecorder, statusRecorder));
+    }
+
+    @Test
+    public void testSubmitAndCleanUpWithSavepoint() throws Exception {
+        var conf = configManager.getDefaultConfig();
+        conf.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
+        configManager.updateDefaultConfig(conf);
+
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // session ready
+        reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+
+        // clean up
+        reconciler.cleanup(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
+        assertEquals(
+                "savepoint_0",
+                sessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
     }
 
     @Test

Reply via email to