This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 18903878 [FLINK-32551] Add option to take a savepoint on
flinkdeployment/flinksessionjob deletion
18903878 is described below
commit 18903878cbeff57fa1ad10ccd66cec3d68de7aa2
Author: oleksandr.nitavskyi <[email protected]>
AuthorDate: Wed Jul 19 14:45:10 2023 +0200
[FLINK-32551] Add option to take a savepoint on
flinkdeployment/flinksessionjob deletion
---
.../shortcodes/generated/dynamic_section.html | 6 +++++
.../kubernetes_operator_config_configuration.html | 6 +++++
.../flink/kubernetes/operator/FlinkOperator.java | 3 +--
.../config/FlinkOperatorConfiguration.java | 7 +++++-
.../config/KubernetesOperatorConfigOptions.java | 8 ++++++
.../deployment/ApplicationReconciler.java | 7 ++++--
.../reconciler/deployment/ReconcilerFactory.java | 5 +---
.../reconciler/deployment/SessionReconciler.java | 6 +----
.../sessionjob/SessionJobReconciler.java | 13 +++++-----
.../kubernetes/operator/OperatorTestBase.java | 3 ++-
.../TestingFlinkSessionJobController.java | 3 +--
.../sessionjob/FlinkSessionJobObserverTest.java | 3 +--
.../deployment/ApplicationReconcilerTest.java | 27 ++++++++++++++++++++
.../deployment/SessionReconcilerTest.java | 3 +--
.../sessionjob/SessionJobReconcilerTest.java | 29 ++++++++++++++++++++--
15 files changed, 99 insertions(+), 30 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 45ecc3f3..7faf191c 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -80,6 +80,12 @@
<td>Boolean</td>
<td>Whether to restart failed jobs.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.job.savepoint-on-deletion</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Indicate whether a savepoint must be taken when deleting a
FlinkDeployment or FlinkSessionJob.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index b3604b80..b6851ad9 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -164,6 +164,12 @@
<td>Boolean</td>
<td>Whether to restart failed jobs.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.job.savepoint-on-deletion</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Indicate whether a savepoint must be taken when deleting a
FlinkDeployment or FlinkSessionJob.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 80b0efd2..80ae70bb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -187,8 +187,7 @@ public class FlinkOperator {
var metricManager =
MetricManager.createFlinkSessionJobMetricManager(baseConfig,
metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager,
listeners);
- var reconciler =
- new SessionJobReconciler(client, eventRecorder,
statusRecorder, configManager);
+ var reconciler = new SessionJobReconciler(client, eventRecorder,
statusRecorder);
var observer = new FlinkSessionJobObserver(eventRecorder);
var canaryResourceManager =
new CanaryResourceManager<FlinkSessionJob>(configManager,
client);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index d4ed1e79..40a6dcb5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -71,6 +71,7 @@ public class FlinkOperatorConfiguration {
String labelSelector;
LeaderElectionConfiguration leaderElectionConfiguration;
DeletionPropagation deletionPropagation;
+ boolean savepointOnDeletion;
public static FlinkOperatorConfiguration fromConfiguration(Configuration
operatorConfig) {
Duration reconcileInterval =
@@ -181,6 +182,9 @@ public class FlinkOperatorConfiguration {
DeletionPropagation deletionPropagation =
operatorConfig.get(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION);
+ boolean savepointOnDeletion =
+
operatorConfig.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION);
+
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
@@ -207,7 +211,8 @@ public class FlinkOperatorConfiguration {
exceptionLabelMapper,
labelSelector,
getLeaderElectionConfig(operatorConfig),
- deletionPropagation);
+ deletionPropagation,
+ savepointOnDeletion);
}
private static LeaderElectionConfiguration
getLeaderElectionConfig(Configuration conf) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index afd391aa..d1fb4422 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -525,4 +525,12 @@ public class KubernetesOperatorConfigOptions {
.enumType(DeletionPropagation.class)
.defaultValue(DeletionPropagation.FOREGROUND)
.withDescription("JM/TM Deployment deletion propagation.");
+
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Boolean> SAVEPOINT_ON_DELETION =
+ operatorConfig("job.savepoint-on-deletion")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Indicate whether a savepoint must be taken when
deleting a FlinkDeployment or FlinkSessionJob.");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 346accaa..95b69f22 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -362,8 +362,11 @@ public class ApplicationReconciler
ctx.getFlinkService()
.deleteClusterDeployment(deployment.getMetadata(), status,
conf, true);
} else {
- ctx.getFlinkService()
- .cancelJob(deployment, UpgradeMode.STATELESS,
ctx.getObserveConfig());
+ UpgradeMode upgradeMode =
+ ctx.getOperatorConfig().isSavepointOnDeletion()
+ ? UpgradeMode.SAVEPOINT
+ : UpgradeMode.STATELESS;
+ cancelJob(ctx, upgradeMode);
}
return DeleteControl.defaultDelete();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 655b3e2f..d1b3863e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -66,10 +66,7 @@ public class ReconcilerFactory {
switch (modes.f0) {
case SESSION:
return new SessionReconciler(
- kubernetesClient,
- eventRecorder,
- deploymentStatusRecorder,
- configManager);
+ kubernetesClient, eventRecorder,
deploymentStatusRecorder);
case APPLICATION:
return new ApplicationReconciler(
kubernetesClient,
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 6cac4533..0154e639 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -23,7 +23,6 @@ import
org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -48,15 +47,12 @@ public class SessionReconciler
FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
private static final Logger LOG =
LoggerFactory.getLogger(SessionReconciler.class);
- private final FlinkConfigManager configManager;
public SessionReconciler(
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
- StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder,
- FlinkConfigManager configManager) {
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder) {
super(kubernetesClient, eventRecorder, statusRecorder, new
NoopJobAutoscalerFactory());
- this.configManager = configManager;
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 47a72d09..701608b5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -24,7 +24,6 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
@@ -47,15 +46,11 @@ public class SessionJobReconciler
private static final Logger LOG =
LoggerFactory.getLogger(SessionJobReconciler.class);
- private final FlinkConfigManager configManager;
-
public SessionJobReconciler(
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
- StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder,
- FlinkConfigManager configManager) {
+ StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder) {
super(kubernetesClient, eventRecorder, statusRecorder, new
NoopJobAutoscalerFactory());
- this.configManager = configManager;
}
@Override
@@ -107,7 +102,11 @@ public class SessionJobReconciler
String jobID =
ctx.getResource().getStatus().getJobStatus().getJobId();
if (jobID != null) {
try {
- cancelJob(ctx, UpgradeMode.STATELESS);
+ UpgradeMode upgradeMode =
+ ctx.getOperatorConfig().isSavepointOnDeletion()
+ ? UpgradeMode.SAVEPOINT
+ : UpgradeMode.STATELESS;
+ cancelJob(ctx, upgradeMode);
} catch (ExecutionException e) {
final var cause = e.getCause();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
index 592f15b9..8cdd3250 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
@@ -33,7 +33,8 @@ import org.junit.jupiter.api.BeforeEach;
/** @link JobStatusObserver unit tests */
public abstract class OperatorTestBase {
- protected FlinkConfigManager configManager = new FlinkConfigManager(new
Configuration());
+ protected Configuration conf = new Configuration();
+ protected FlinkConfigManager configManager = new FlinkConfigManager(conf);
protected TestingFlinkService flinkService;
protected EventCollector eventCollector = new EventCollector();
protected EventRecorder eventRecorder;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index b6ea2c2b..bdc45034 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -95,8 +95,7 @@ public class TestingFlinkSessionJobController
new FlinkSessionJobController(
ValidatorUtils.discoverValidators(configManager),
ctxFactory,
- new SessionJobReconciler(
- kubernetesClient, eventRecorder,
statusRecorder, configManager),
+ new SessionJobReconciler(kubernetesClient,
eventRecorder, statusRecorder),
new FlinkSessionJobObserver(eventRecorder),
statusRecorder,
eventRecorder,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index a18a05fd..07054d6e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -66,8 +66,7 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
reconciler =
new TestReconcilerAdapter<>(
this,
- new SessionJobReconciler(
- kubernetesClient, eventRecorder,
statusRecorder, configManager));
+ new SessionJobReconciler(kubernetesClient,
eventRecorder, statusRecorder));
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index f79dd596..e9f50e63 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -121,6 +121,33 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
executorService = Executors.newDirectExecutorService();
}
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion)
throws Exception {
+ var conf = configManager.getDefaultConfig();
+ conf.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
+ configManager.updateDefaultConfig(conf);
+
+ FlinkDeployment deployment =
TestUtils.buildApplicationCluster(flinkVersion);
+
+ // session ready
+ reconciler.reconcile(deployment,
TestUtils.createContextWithReadyFlinkDeployment());
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ // clean up
+ assertEquals(
+ null,
deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+ reconciler.cleanup(deployment,
TestUtils.createContextWithReadyFlinkDeployment());
+ assertEquals(
+ "savepoint_0",
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
+ }
+
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index a3af075a..a087e6e2 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -61,8 +61,7 @@ public class SessionReconcilerTest extends OperatorTestBase {
reconciler =
new TestReconcilerAdapter<>(
this,
- new SessionReconciler(
- kubernetesClient, eventRecorder,
statusRecorder, configManager));
+ new SessionReconciler(kubernetesClient, eventRecorder,
statusRecorder));
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index bfc00846..36569904 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -87,8 +87,33 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
reconciler =
new TestReconcilerAdapter<>(
this,
- new SessionJobReconciler(
- kubernetesClient, eventRecorder,
statusRecorder, configManager));
+ new SessionJobReconciler(kubernetesClient,
eventRecorder, statusRecorder));
+ }
+
+ @Test
+ public void testSubmitAndCleanUpWithSavepoint() throws Exception {
+ var conf = configManager.getDefaultConfig();
+ conf.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
+ configManager.updateDefaultConfig(conf);
+
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ // session ready
+ reconciler.reconcile(sessionJob,
TestUtils.createContextWithReadyFlinkDeployment());
+ assertEquals(1, flinkService.listJobs().size());
+ verifyAndSetRunningJobsToStatus(
+ sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+
+ // clean up
+ reconciler.cleanup(sessionJob,
TestUtils.createContextWithReadyFlinkDeployment());
+ assertEquals(
+ "savepoint_0",
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
}
@Test