This is an automated email from the ASF dual-hosted git repository.
zhouli pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new edf8b4a53 [ISSUE-2936][Feature] Support flink native type savepoint
(#2937)
edf8b4a53 is described below
commit edf8b4a53f5ee4bd9e2f2941bc3f11815bf97577
Author: xiebin <[email protected]>
AuthorDate: Wed Aug 23 16:15:20 2023 +0800
[ISSUE-2936][Feature] Support flink native type savepoint (#2937)
* support flink native type savepoint
* remove some debug loggers
* fix a mistake
* refine code:
hide native format switch component without savepoint.
add supported flink version note of native format.
---
.../core/controller/ApplicationController.java | 7 +++++++
.../core/controller/SavePointController.java | 11 ++++++++---
.../console/core/entity/Application.java | 2 ++
.../console/core/service/SavePointService.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 1 +
.../core/service/impl/SavePointServiceImpl.java | 10 +++++++---
.../system/controller/AccessTokenController.java | 1 +
.../src/api/flink/app/app.type.ts | 2 ++
.../components/AppView/StopApplicationModal.vue | 15 +++++++++++++-
.../src/views/flink/app/hooks/useSavepoint.tsx | 19 +++++++++++++++++-
.../flink/client/bean/CancelRequest.scala | 1 +
.../flink/client/bean/SavepointRequestTrait.scala | 2 ++
.../client/bean/TriggerSavepointRequest.scala | 1 +
.../flink/client/trait/FlinkClientTrait.scala | 12 ++++++++---
.../streampark/flink/core/FlinkClientTrait.scala | 16 ++++++++++++---
.../streampark/flink/core/FlinkClusterClient.scala | 23 ++++++++++++++++------
.../streampark/flink/core/FlinkClusterClient.scala | 23 ++++++++++++++++------
.../streampark/flink/core/FlinkClusterClient.scala | 23 ++++++++++++++++------
18 files changed, 138 insertions(+), 33 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 0f9825f3f..f692de2c6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -279,6 +279,13 @@ public class ApplicationController {
in = ParameterIn.QUERY,
required = true,
example = "false",
+ schema = @Schema(implementation = boolean.class, defaultValue =
"false")),
+ @Parameter(
+ name = "nativeFormat",
+ description = "use savepoint native format",
+ in = ParameterIn.QUERY,
+ required = true,
+ example = "false",
schema = @Schema(implementation = boolean.class, defaultValue =
"false"))
})
@PermissionAction(id = "#app.id", type = PermissionType.APP)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
index b5d69dde1..95553826e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
@@ -91,13 +91,18 @@ public class SavePointController {
@Parameter(
name = "savepointPath",
description = "specified savepoint path",
- schema = @Schema(implementation = String.class))
+ schema = @Schema(implementation = String.class)),
+ @Parameter(
+ name = "nativeFormat",
+ description = "use native format",
+ schema = @Schema(implementation = Boolean.class))
})
@ApiAccess
@PostMapping("trigger")
@RequiresPermissions("savepoint:trigger")
- public RestResponse trigger(Long appId, @Nullable String savepointPath) {
- savePointService.trigger(appId, savepointPath);
+ public RestResponse trigger(
+ Long appId, @Nullable String savepointPath, @Nullable Boolean
nativeFormat) {
+ savePointService.trigger(appId, savepointPath, nativeFormat);
return RestResponse.success(true);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 7bbeffccb..edb56c6a0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -230,6 +230,8 @@ public class Application implements Serializable {
private transient String savePoint;
private transient Boolean savePointed = false;
private transient Boolean drain = false;
+ private transient Boolean nativeFormat = false;
+ private transient Long savePointTimeout = 60L;
private transient Boolean allowNonRestored = false;
private transient Integer restoreMode;
private transient String socketId;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
index c6b5479f5..684709dc7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
@@ -33,7 +33,7 @@ public interface SavePointService extends IService<SavePoint>
{
SavePoint getLatest(Long id);
- void trigger(Long appId, @Nullable String savepointPath);
+ void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean
nativeFormat);
Boolean delete(Long id, Application application) throws InternalException;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3dcf471ce..76c5089b7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1299,6 +1299,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appParam.getSavePointed(),
appParam.getDrain(),
customSavepoint,
+ appParam.getNativeFormat(),
application.getK8sNamespace());
final Date triggerTime = new Date();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 2ed5bb3b5..4622fbe41 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -165,7 +165,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
@Override
- public void trigger(Long appId, @Nullable String savepointPath) {
+ public void trigger(Long appId, @Nullable String savepointPath, @Nullable
Boolean nativeFormat) {
log.info("Start to trigger savepoint for app {}", appId);
Application application = applicationService.getById(appId);
@@ -187,7 +187,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
// infer savepoint
TriggerSavepointRequest request =
- renderTriggerSavepointRequest(savepointPath, application, flinkEnv);
+ renderTriggerSavepointRequest(savepointPath, nativeFormat,
application, flinkEnv);
CompletableFuture<SavepointResponse> savepointFuture =
CompletableFuture.supplyAsync(() ->
FlinkClient.triggerSavepoint(request), executorService);
@@ -481,7 +481,10 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Nonnull
private TriggerSavepointRequest renderTriggerSavepointRequest(
- @Nullable String savepointPath, Application application, FlinkEnv
flinkEnv) {
+ @Nullable String savepointPath,
+ Boolean nativeFormat,
+ Application application,
+ FlinkEnv flinkEnv) {
String customSavepoint = this.getFinalSavepointDir(savepointPath,
application);
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
@@ -496,6 +499,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
clusterId,
application.getJobId(),
customSavepoint,
+ nativeFormat,
application.getK8sNamespace());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
index 09dd0b55c..9a58bd8b6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
@@ -186,6 +186,7 @@ public class AccessTokenController {
.addFormData("id", appId)
.addFormData("savePointed", "false")
.addFormData("drain", "false")
+ .addFormData("nativeFormat", "false")
.addFormData("savePoint", "")
.build();
}
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index 64b6f9a93..71778e6cb 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -119,6 +119,7 @@ export interface AppListRecord {
savePoint?: any;
savePointed: boolean;
drain: boolean;
+ nativeFormat: boolean;
allowNonRestored: boolean;
socketId?: any;
projectName?: any;
@@ -149,6 +150,7 @@ export interface CancelParam {
id: string;
savePointed: boolean;
drain: boolean;
+ nativeFormat: boolean;
savePoint: string;
}
// create Params
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index f8da5b674..7f47a2304 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -67,6 +67,18 @@
afterItem: () => h('span', { class: 'tip-info' }, 'cancel job with
savepoint path'),
ifShow: ({ values }) => !!values.stopSavePointed,
},
+ {
+ field: 'nativeFormat',
+ label: 'NativeFormat',
+ component: 'Switch',
+ componentProps: {
+ checkedChildren: 'ON',
+ unCheckedChildren: 'OFF',
+ },
+ defaultValue: false,
+ afterItem: () => h('span', { class: 'conf-switch' }, 'Note: native
format savepoint is supported since flink 1.15'),
+ ifShow: ({ values }) => !!values.stopSavePointed,
+ },
{
field: 'drain',
label: 'Drain',
@@ -89,12 +101,13 @@
/* submit */
async function handleSubmit() {
try {
- const { stopSavePointed, customSavepoint, drain } = (await validate())
as Recordable;
+ const { stopSavePointed, customSavepoint, drain, nativeFormat } = (await
validate()) as Recordable;
const stopReq = {
id: app.id,
savePointed: stopSavePointed,
savePoint: customSavepoint,
drain: drain,
+ nativeFormat: nativeFormat,
};
if (stopSavePointed) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index 106c33cdc..a7ffb3994 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -17,7 +17,7 @@
import Icon from '/@/components/Icon';
import { useMessage } from '/@/hooks/web/useMessage';
import { useI18n } from '/@/hooks/web/useI18n';
-import { Form, Input } from 'ant-design-vue';
+import { Form, Input, Switch} from 'ant-design-vue';
import { fetchCheckSavepointPath } from '/@/api/flink/app/app';
import { trigger } from '/@/api/flink/app/savepoint';
import { ref, unref } from 'vue';
@@ -28,6 +28,7 @@ export const useSavepoint = (updateOption: Fn) => {
const submitLoading = ref(false);
const appId = ref('');
const customSavepoint = ref('');
+ const nativeFormat = ref(false);
async function handleSavepointAction(savepointTriggerReq: {
appId: string | number;
@@ -74,6 +75,20 @@ export const useSavepoint = (updateOption: Fn) => {
onInput={(e) => (customSavepoint.value = e.target.value || '')}
/>
</Form.Item>
+
+ <Form.Item
+ name="nativeFormat"
+ label="Native Format"
+ label-col={{ lg: { span: 7, offset: 0 }, sm: { span: 7, offset:
0 } }}
+ wrapper-col={{ lg: { span: 16, offset: 0 }, sm: { span: 4,
offset: 0 } }}
+ >
+ <Switch
+ checkedValue={true}
+ unCheckedValue={false}
+ checked={nativeFormat.value}
+ onClick={(checked, e) => (nativeFormat.value = checked ||
false)}
+ />
+ </Form.Item>
</Form>
);
},
@@ -83,6 +98,7 @@ export const useSavepoint = (updateOption: Fn) => {
const savepointReq = {
appId: appId.value,
savepointPath: unref(customSavepoint),
+ nativeFormat: unref(nativeFormat),
};
if (unref(customSavepoint)) {
submitLoading.value = true;
@@ -118,6 +134,7 @@ export const useSavepoint = (updateOption: Fn) => {
},
onCancel: () => {
customSavepoint.value = '';
+ nativeFormat.value = false;
},
});
};
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index f11f561c9..dd5481dc4 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -33,5 +33,6 @@ case class CancelRequest(
override val withSavepoint: Boolean,
withDrain: Boolean,
savepointPath: String,
+ nativeFormat: Boolean,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
index 11e3866cf..34bc21c61 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
@@ -38,6 +38,8 @@ trait SavepointRequestTrait {
val savepointPath: String
+ val nativeFormat: Boolean
+
val kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE
@Nullable val properties: JavaMap[String, Any]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 8f6344834..035c5348d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -32,5 +32,6 @@ case class TriggerSavepointRequest(
clusterId: String,
jobId: String,
savepointPath: String,
+ nativeFormat: Boolean,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5c65da5ba..b75bc2b7a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -158,6 +158,7 @@ trait FlinkClientTrait extends Logger {
| flinkVersion : ${savepointRequest.flinkVersion.version}
| clusterId : ${savepointRequest.clusterId}
| savePointPath : ${savepointRequest.savepointPath}
+ | nativeFormat : ${savepointRequest.nativeFormat}
| k8sNamespace : ${savepointRequest.kubernetesNamespace}
| appId : ${savepointRequest.clusterId}
| jobId : ${savepointRequest.jobId}
@@ -178,6 +179,7 @@ trait FlinkClientTrait extends Logger {
| withSavePoint : ${cancelRequest.withSavepoint}
| savePointPath : ${cancelRequest.savepointPath}
| withDrain : ${cancelRequest.withDrain}
+ | nativeFormat : ${cancelRequest.nativeFormat}
| k8sNamespace : ${cancelRequest.kubernetesNamespace}
| appId : ${cancelRequest.clusterId}
| jobId : ${cancelRequest.jobId}
@@ -517,11 +519,15 @@ trait FlinkClientTrait extends Logger {
null
case (true, false) =>
clientWrapper
- .cancelWithSavepoint(jobID, savePointDir)
+ .cancelWithSavepoint(jobID, savePointDir, cancelRequest.nativeFormat)
.get()
case (_, _) =>
clientWrapper
- .stopWithSavepoint(jobID, cancelRequest.withDrain, savePointDir)
+ .stopWithSavepoint(
+ jobID,
+ cancelRequest.withDrain,
+ savePointDir,
+ cancelRequest.nativeFormat)
.get()
}
}
@@ -559,7 +565,7 @@ trait FlinkClientTrait extends Logger {
client: ClusterClient[_]): String = {
val savepointPath = tryGetSavepointPathIfNeed(savepointRequest)
val clientWrapper = new FlinkClusterClient(client)
- clientWrapper.triggerSavepoint(jobID, savepointPath).get()
+ clientWrapper.triggerSavepoint(jobID, savepointPath,
savepointRequest.nativeFormat).get()
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
index 6d8393ee8..2d149bdd5 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
@@ -24,15 +24,25 @@ import java.util.concurrent.CompletableFuture
abstract class FlinkClientTrait[T](clusterClient: ClusterClient[T]) {
- def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
+ def triggerSavepoint(
+ jobID: JobID,
+ savepointDir: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.triggerSavepoint(jobID, savepointDir)
}
- def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String]
= {
+ def cancelWithSavepoint(
+ jobID: JobID,
+ s: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.cancelWithSavepoint(jobID, s)
}
- def stopWithSavepoint(jobID: JobID, b: Boolean, s: String):
CompletableFuture[String] =
+ def stopWithSavepoint(
+ jobID: JobID,
+ b: Boolean,
+ s: String,
+ nativeFormat: Boolean): CompletableFuture[String] =
clusterClient.stopWithSavepoint(jobID, b, s)
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 4f6336f5a..c3a39ef14 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -25,25 +25,36 @@ import java.util.concurrent.CompletableFuture
class FlinkClusterClient[T](clusterClient: ClusterClient[T])
extends FlinkClientTrait[T](clusterClient) {
- override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
- clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
+ override def triggerSavepoint(
+ jobID: JobID,
+ savepointDir: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.triggerSavepoint(
+ jobID,
+ savepointDir,
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
override def cancelWithSavepoint(
jobID: JobID,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(
+ jobID,
+ savepointDirectory,
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
override def stopWithSavepoint(
jobID: JobID,
advanceToEndOfEventTime: Boolean,
- savepointDirectory: String): CompletableFuture[String] = {
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.stopWithSavepoint(
jobID,
advanceToEndOfEventTime,
savepointDirectory,
- SavepointFormatType.DEFAULT)
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 4f6336f5a..c3a39ef14 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -25,25 +25,36 @@ import java.util.concurrent.CompletableFuture
class FlinkClusterClient[T](clusterClient: ClusterClient[T])
extends FlinkClientTrait[T](clusterClient) {
- override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
- clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
+ override def triggerSavepoint(
+ jobID: JobID,
+ savepointDir: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.triggerSavepoint(
+ jobID,
+ savepointDir,
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
override def cancelWithSavepoint(
jobID: JobID,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(
+ jobID,
+ savepointDirectory,
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
override def stopWithSavepoint(
jobID: JobID,
advanceToEndOfEventTime: Boolean,
- savepointDirectory: String): CompletableFuture[String] = {
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.stopWithSavepoint(
jobID,
advanceToEndOfEventTime,
savepointDirectory,
- SavepointFormatType.DEFAULT)
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 4f6336f5a..c3a39ef14 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -25,25 +25,36 @@ import java.util.concurrent.CompletableFuture
class FlinkClusterClient[T](clusterClient: ClusterClient[T])
extends FlinkClientTrait[T](clusterClient) {
- override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
- clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
+ override def triggerSavepoint(
+ jobID: JobID,
+ savepointDir: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.triggerSavepoint(
+ jobID,
+ savepointDir,
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
override def cancelWithSavepoint(
jobID: JobID,
- savepointDirectory: String): CompletableFuture[String] = {
- clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(
+ jobID,
+ savepointDirectory,
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
override def stopWithSavepoint(
jobID: JobID,
advanceToEndOfEventTime: Boolean,
- savepointDirectory: String): CompletableFuture[String] = {
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.stopWithSavepoint(
jobID,
advanceToEndOfEventTime,
savepointDirectory,
- SavepointFormatType.DEFAULT)
+ if (nativeFormat) SavepointFormatType.NATIVE else
SavepointFormatType.CANONICAL)
}
}