This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 61ff9f59 [FLINK-32973] Add capability to configure 
kubernetes.operator.job.savepoint-on-deletion on a per-resource level
61ff9f59 is described below

commit 61ff9f59c14e012ea0237da0c1cc2f7297483293
Author: Nicolas Fraison <nicolas.frai...@datadoghq.com>
AuthorDate: Fri Aug 25 16:55:09 2023 +0200

    [FLINK-32973] Add capability to configure 
kubernetes.operator.job.savepoint-on-deletion on a per-resource level
---
 .../config/FlinkOperatorConfiguration.java         |  7 +-----
 .../deployment/ApplicationReconciler.java          |  3 ++-
 .../sessionjob/SessionJobReconciler.java           |  5 +++-
 .../deployment/ApplicationReconcilerTest.java      | 28 ++++++++++++++++++++++
 .../sessionjob/SessionJobReconcilerTest.java       | 26 ++++++++++++++++++++
 5 files changed, 61 insertions(+), 8 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 40a6dcb5..d4ed1e79 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -71,7 +71,6 @@ public class FlinkOperatorConfiguration {
     String labelSelector;
     LeaderElectionConfiguration leaderElectionConfiguration;
     DeletionPropagation deletionPropagation;
-    boolean savepointOnDeletion;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         Duration reconcileInterval =
@@ -182,9 +181,6 @@ public class FlinkOperatorConfiguration {
         DeletionPropagation deletionPropagation =
                 
operatorConfig.get(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION);
 
-        boolean savepointOnDeletion =
-                
operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION);
-
         return new FlinkOperatorConfiguration(
                 reconcileInterval,
                 reconcilerMaxParallelism,
@@ -211,8 +207,7 @@ public class FlinkOperatorConfiguration {
                 exceptionLabelMapper,
                 labelSelector,
                 getLeaderElectionConfig(operatorConfig),
-                deletionPropagation,
-                savepointOnDeletion);
+                deletionPropagation);
     }
 
     private static LeaderElectionConfiguration 
getLeaderElectionConfig(Configuration conf) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 284a178b..bf80e287 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -361,8 +361,9 @@ public class ApplicationReconciler
             ctx.getFlinkService()
                     .deleteClusterDeployment(deployment.getMetadata(), status, 
conf, true);
         } else {
+            var observeConfig = ctx.getObserveConfig();
             UpgradeMode upgradeMode =
-                    ctx.getOperatorConfig().isSavepointOnDeletion()
+                    
observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
                             ? UpgradeMode.SAVEPOINT
                             : UpgradeMode.STATELESS;
             cancelJob(ctx, upgradeMode);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 701608b5..6ac7c373 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
@@ -102,8 +103,10 @@ public class SessionJobReconciler
             String jobID = 
ctx.getResource().getStatus().getJobStatus().getJobId();
             if (jobID != null) {
                 try {
+                    var observeConfig = ctx.getObserveConfig();
                     UpgradeMode upgradeMode =
-                            ctx.getOperatorConfig().isSavepointOnDeletion()
+                            observeConfig.getBoolean(
+                                            
KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
                                     ? UpgradeMode.SAVEPOINT
                                     : UpgradeMode.STATELESS;
                     cancelJob(ctx, upgradeMode);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index e5a1ab35..3c272d04 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -171,6 +171,34 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                         .getLocation());
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion 
flinkVersion)
+            throws Exception {
+        FlinkDeployment deployment = 
TestUtils.buildApplicationCluster(flinkVersion);
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                
.put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true");
+
+        // session ready
+        reconciler.reconcile(deployment, 
TestUtils.createContextWithReadyFlinkDeployment());
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        // clean up
+        assertEquals(
+                null, 
deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+        reconciler.cleanup(deployment, 
TestUtils.createContextWithReadyFlinkDeployment());
+        assertEquals(
+                "savepoint_0",
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
+    }
+
     @ParameterizedTest
     
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 3c057fff..18f8dce7 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -122,6 +122,32 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                         .getLocation());
     }
 
+    @Test
+    public void testSubmitAndCleanUpWithSavepointOnResource() throws Exception 
{
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+        sessionJob
+                .getSpec()
+                .getFlinkConfiguration()
+                
.put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true");
+
+        // session ready
+        reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+
+        // clean up
+        reconciler.cleanup(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
+        assertEquals(
+                "savepoint_0",
+                sessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
+    }
+
     @Test
     public void testSubmitAndCleanUp() throws Exception {
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();

Reply via email to