This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 70bb74c0 [FLINK-28763] Make savepoint format configurable for
upgrades/savepoint operations
70bb74c0 is described below
commit 70bb74c06280985065f0ce56236186a146d4436f
Author: Nicholas Jiang <[email protected]>
AuthorDate: Tue Aug 9 03:36:08 2022 -0700
[FLINK-28763] Make savepoint format configurable for upgrades/savepoint
operations
---
.../shortcodes/generated/dynamic_section.html | 6 ++
.../kubernetes_operator_config_configuration.html | 6 ++
.../config/KubernetesOperatorConfigOptions.java | 9 +++
.../operator/service/AbstractFlinkService.java | 14 ++--
.../kubernetes/operator/TestingClusterClient.java | 12 +++-
.../operator/service/NativeFlinkServiceTest.java | 75 ++++++++++++++++++++++
6 files changed, 117 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 8d8fb620..4cfac307 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -44,6 +44,12 @@
<td>Duration</td>
<td>Interval at which periodic savepoints will be triggered. The
triggering schedule is not guaranteed, savepoints will be triggered as part of
the regular reconcile loop.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.savepoint.format.type</h5></td>
+ <td style="word-wrap: break-word;">CANONICAL</td>
+ <td><p>Enum</p></td>
+ <td>Type of the binary format in which a savepoint should be
taken.<br /><br />Possible values:<ul><li>"CANONICAL": A canonical, common for
all state backends format. It lets you switch state backends.</li><li>"NATIVE":
A format specific for the chosen state backend, in its native binary format.
Might be faster to take and restore from than the canonical one.</li></ul></td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">86400000 ms</td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index f950054e..aaf1e936 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -140,6 +140,12 @@
<td>Integer</td>
<td>Max attempts of automatic reconcile retries on recoverable
errors.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.savepoint.format.type</h5></td>
+ <td style="word-wrap: break-word;">CANONICAL</td>
+ <td><p>Enum</p></td>
+ <td>Type of the binary format in which a savepoint should be
taken.<br /><br />Possible values:<ul><li>"CANONICAL": A canonical, common for
all state backends format. It lets you switch state backends.</li><li>"NATIVE":
A format specific for the chosen state backend, in its native binary format.
Might be faster to take and restore from than the canonical one.</li></ul></td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">86400000 ms</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 774882ac..11f84b7c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
@@ -280,4 +281,12 @@ public class KubernetesOperatorConfigOptions {
.defaultValue(true)
.withDescription(
"Enables last-state fallback for savepoint upgrade
mode. When the job is not running thus savepoint cannot be triggered but HA
metadata is available for last state restore the operator can initiate the
upgrade process when the flag is enabled.");
+
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<SavepointFormatType>
OPERATOR_SAVEPOINT_FORMAT_TYPE =
+ ConfigOptions.key("kubernetes.operator.savepoint.format.type")
+ .enumType(SavepointFormatType.class)
+ .defaultValue(SavepointFormatType.DEFAULT)
+ .withDescription(
+ "Type of the binary format in which a savepoint
should be taken.");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 1a601446..bd316290 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -25,11 +25,11 @@ import
org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
@@ -291,7 +291,9 @@ public abstract class AbstractFlinkService implements
FlinkService {
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
- ?
SavepointFormatType.DEFAULT
+ ? conf.get(
+
KubernetesOperatorConfigOptions
+
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
@@ -384,7 +386,9 @@ public abstract class AbstractFlinkService implements
FlinkService {
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
- ?
SavepointFormatType.DEFAULT
+ ? conf.get(
+
KubernetesOperatorConfigOptions
+
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
@@ -442,7 +446,9 @@ public abstract class AbstractFlinkService implements
FlinkService {
false,
conf.get(FLINK_VERSION)
.isNewerVersionThan(FlinkVersion.v1_14)
- ?
SavepointFormatType.DEFAULT
+ ? conf.get(
+
KubernetesOperatorConfigOptions
+
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null,
null))
.get(timeout, TimeUnit.SECONDS);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index d96427e4..413a6943 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -57,6 +57,8 @@ public class TestingClusterClient<T> extends
RestClusterClient<T> {
stopWithSavepointFunction =
(ignore1, ignore2, savepointPath) ->
CompletableFuture.completedFuture(savepointPath);
+ private TriFunction<JobID, SavepointFormatType, String,
CompletableFuture<String>>
+ stopWithSavepointFormat;
private TriFunction<
MessageHeaders<?, ?, ?>,
MessageParameters,
@@ -97,6 +99,12 @@ public class TestingClusterClient<T> extends
RestClusterClient<T> {
this.stopWithSavepointFunction = stopWithSavepointFunction;
}
+ public void setStopWithSavepointFormat(
+ TriFunction<JobID, SavepointFormatType, String,
CompletableFuture<String>>
+ stopWithSavepointFormat) {
+ this.stopWithSavepointFormat = stopWithSavepointFormat;
+ }
+
public void setRequestProcessor(
TriFunction<
MessageHeaders<?, ?, ?>,
@@ -184,7 +192,9 @@ public class TestingClusterClient<T> extends
RestClusterClient<T> {
boolean advanceToEndOfTime,
@Nullable String savepointDirectory,
SavepointFormatType formatType) {
- return stopWithSavepointFunction.apply(jobId, advanceToEndOfTime,
savepointDirectory);
+ return stopWithSavepointFormat == null
+ ? stopWithSavepointFunction.apply(jobId, advanceToEndOfTime,
savepointDirectory)
+ : stopWithSavepointFormat.apply(jobId, formatType,
savepointDirectory);
}
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 8e151f04..c2d13029 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -20,13 +20,16 @@ package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingClusterClient;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -337,6 +340,78 @@ public class NativeFlinkServiceTest {
AbstractFlinkService.getEffectiveStatus(allFinished));
}
+ @Test
+ public void testNativeSavepointFormat() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ final String savepointPath = "file:///path/of/svp";
+ final CompletableFuture<Tuple4<JobID, String, Boolean,
SavepointFormatType>>
+ triggerSavepointFuture = new CompletableFuture<>();
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointPath);
+ testingClusterClient.setRequestProcessor(
+ (headers, parameters, requestBody) -> {
+ triggerSavepointFuture.complete(
+ new Tuple4<>(
+ ((SavepointTriggerMessageParameters)
parameters)
+ .jobID.getValue(),
+ ((SavepointTriggerRequestBody) requestBody)
+ .getTargetDirectory()
+ .get(),
+ ((SavepointTriggerRequestBody)
requestBody).isCancelJob(),
+ ((SavepointTriggerRequestBody)
requestBody).getFormatType()));
+ return CompletableFuture.completedFuture(new
TriggerResponse(new TriggerId()));
+ });
+ final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>>
+ stopWithSavepointFuture = new CompletableFuture<>();
+ testingClusterClient.setStopWithSavepointFormat(
+ (id, formatType, savepointDir) -> {
+ stopWithSavepointFuture.complete(new Tuple3<>(id,
formatType, savepointDir));
+ return CompletableFuture.completedFuture(savepointPath);
+ });
+
+ final FlinkService flinkService =
createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
savepointPath);
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ JobStatus jobStatus = deployment.getStatus().getJobStatus();
+ jobStatus.setJobId(jobID.toHexString());
+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
+
+ jobStatus.setJobId(jobID.toString());
+ deployment.getStatus().setJobStatus(jobStatus);
+ flinkService.triggerSavepoint(
+ deployment.getStatus().getJobStatus().getJobId(),
+ SavepointTriggerType.MANUAL,
+ deployment.getStatus().getJobStatus().getSavepointInfo(),
+ new Configuration(configuration)
+ .set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+ SavepointFormatType.NATIVE));
+ assertTrue(triggerSavepointFuture.isDone());
+ assertEquals(jobID, triggerSavepointFuture.get().f0);
+ assertEquals(savepointPath, triggerSavepointFuture.get().f1);
+ assertFalse(triggerSavepointFuture.get().f2);
+ assertEquals(SavepointFormatType.NATIVE,
triggerSavepointFuture.get().f3);
+
+ flinkService.cancelJob(
+ deployment,
+ UpgradeMode.SAVEPOINT,
+ new Configuration(configManager.getObserveConfig(deployment))
+ .set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+ SavepointFormatType.NATIVE));
+ assertTrue(stopWithSavepointFuture.isDone());
+ assertEquals(jobID, stopWithSavepointFuture.get().f0);
+ assertEquals(SavepointFormatType.NATIVE,
stopWithSavepointFuture.get().f1);
+ assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+ }
+
private JobDetails getJobDetails(
org.apache.flink.api.common.JobStatus status,
Tuple2<ExecutionState, Integer>... tasksPerState) {