This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 61ff9f59 [FLINK-32973] Add capability to configure kubernetes.operator.job.savepoint-on-deletion on a per-resource level 61ff9f59 is described below commit 61ff9f59c14e012ea0237da0c1cc2f7297483293 Author: Nicolas Fraison <nicolas.frai...@datadoghq.com> AuthorDate: Fri Aug 25 16:55:09 2023 +0200 [FLINK-32973] Add capability to configure kubernetes.operator.job.savepoint-on-deletion on a per-resource level --- .../config/FlinkOperatorConfiguration.java | 7 +----- .../deployment/ApplicationReconciler.java | 3 ++- .../sessionjob/SessionJobReconciler.java | 5 +++- .../deployment/ApplicationReconcilerTest.java | 28 ++++++++++++++++++++++ .../sessionjob/SessionJobReconcilerTest.java | 26 ++++++++++++++++++++ 5 files changed, 61 insertions(+), 8 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index 40a6dcb5..d4ed1e79 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -71,7 +71,6 @@ public class FlinkOperatorConfiguration { String labelSelector; LeaderElectionConfiguration leaderElectionConfiguration; DeletionPropagation deletionPropagation; - boolean savepointOnDeletion; public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { Duration reconcileInterval = @@ -182,9 +181,6 @@ public class FlinkOperatorConfiguration { DeletionPropagation deletionPropagation = operatorConfig.get(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION); - boolean savepointOnDeletion = - operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION); - return new FlinkOperatorConfiguration( reconcileInterval, reconcilerMaxParallelism, @@ -211,8 +207,7 @@ public class FlinkOperatorConfiguration { exceptionLabelMapper, labelSelector, getLeaderElectionConfig(operatorConfig), - deletionPropagation, - savepointOnDeletion); + deletionPropagation); } private static LeaderElectionConfiguration getLeaderElectionConfig(Configuration conf) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 284a178b..bf80e287 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -361,8 +361,9 @@ public class ApplicationReconciler ctx.getFlinkService() .deleteClusterDeployment(deployment.getMetadata(), status, conf, true); } else { + var observeConfig = ctx.getObserveConfig(); UpgradeMode upgradeMode = - ctx.getOperatorConfig().isSavepointOnDeletion() + observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) ? UpgradeMode.SAVEPOINT : UpgradeMode.STATELESS; cancelJob(ctx, upgradeMode); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index 701608b5..6ac7c373 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler; import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory; @@ -102,8 +103,10 @@ public class SessionJobReconciler String jobID = ctx.getResource().getStatus().getJobStatus().getJobId(); if (jobID != null) { try { + var observeConfig = ctx.getObserveConfig(); UpgradeMode upgradeMode = - ctx.getOperatorConfig().isSavepointOnDeletion() + observeConfig.getBoolean( + KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) ? UpgradeMode.SAVEPOINT : UpgradeMode.STATELESS; cancelJob(ctx, upgradeMode); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index e5a1ab35..3c272d04 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -171,6 +171,34 @@ public class ApplicationReconcilerTest extends OperatorTestBase { .getLocation()); } + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersion) + throws Exception { + FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion); + deployment + .getSpec() + .getFlinkConfiguration() + .put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true"); + + // session ready + reconciler.reconcile(deployment, TestUtils.createContextWithReadyFlinkDeployment()); + verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); + + // clean up + assertEquals( + null, deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint()); + reconciler.cleanup(deployment, TestUtils.createContextWithReadyFlinkDeployment()); + assertEquals( + "savepoint_0", + deployment + .getStatus() + .getJobStatus() + .getSavepointInfo() + .getLastSavepoint() + .getLocation()); + } + @ParameterizedTest @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") public void testUpgrade(FlinkVersion flinkVersion) throws Exception { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index 3c057fff..18f8dce7 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -122,6 +122,32 @@ public class SessionJobReconcilerTest extends OperatorTestBase { .getLocation()); } + @Test + public void testSubmitAndCleanUpWithSavepointOnResource() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + sessionJob + .getSpec() + .getFlinkConfiguration() + .put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true"); + + // session ready + reconciler.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs()); + + // clean up + reconciler.cleanup(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); + assertEquals( + "savepoint_0", + sessionJob + .getStatus() + .getJobStatus() + .getSavepointInfo() + .getLastSavepoint() + .getLocation()); + } + @Test public void testSubmitAndCleanUp() throws Exception { FlinkSessionJob sessionJob = TestUtils.buildSessionJob();