This is an automated email from the ASF dual-hosted git repository.

zhouli pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new edf8b4a53 [ISSUE-2936][Feature] Support flink native type savepoint 
(#2937)
edf8b4a53 is described below

commit edf8b4a53f5ee4bd9e2f2941bc3f11815bf97577
Author: xiebin <[email protected]>
AuthorDate: Wed Aug 23 16:15:20 2023 +0800

    [ISSUE-2936][Feature] Support flink native type savepoint (#2937)
    
    * support flink native type savepoint
    
    * remove some debug loggers
    
    * fix a mistake
    
    * refine code:
    hide native format switch component without savepoint.
    add supported flink version note of native format.
---
 .../core/controller/ApplicationController.java     |  7 +++++++
 .../core/controller/SavePointController.java       | 11 ++++++++---
 .../console/core/entity/Application.java           |  2 ++
 .../console/core/service/SavePointService.java     |  2 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  1 +
 .../core/service/impl/SavePointServiceImpl.java    | 10 +++++++---
 .../system/controller/AccessTokenController.java   |  1 +
 .../src/api/flink/app/app.type.ts                  |  2 ++
 .../components/AppView/StopApplicationModal.vue    | 15 +++++++++++++-
 .../src/views/flink/app/hooks/useSavepoint.tsx     | 19 +++++++++++++++++-
 .../flink/client/bean/CancelRequest.scala          |  1 +
 .../flink/client/bean/SavepointRequestTrait.scala  |  2 ++
 .../client/bean/TriggerSavepointRequest.scala      |  1 +
 .../flink/client/trait/FlinkClientTrait.scala      | 12 ++++++++---
 .../streampark/flink/core/FlinkClientTrait.scala   | 16 ++++++++++++---
 .../streampark/flink/core/FlinkClusterClient.scala | 23 ++++++++++++++++------
 .../streampark/flink/core/FlinkClusterClient.scala | 23 ++++++++++++++++------
 .../streampark/flink/core/FlinkClusterClient.scala | 23 ++++++++++++++++------
 18 files changed, 138 insertions(+), 33 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 0f9825f3f..f692de2c6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -279,6 +279,13 @@ public class ApplicationController {
         in = ParameterIn.QUERY,
         required = true,
         example = "false",
+        schema = @Schema(implementation = boolean.class, defaultValue = 
"false")),
+    @Parameter(
+        name = "nativeFormat",
+        description = "use savepoint native format",
+        in = ParameterIn.QUERY,
+        required = true,
+        example = "false",
         schema = @Schema(implementation = boolean.class, defaultValue = 
"false"))
   })
   @PermissionAction(id = "#app.id", type = PermissionType.APP)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
index b5d69dde1..95553826e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
@@ -91,13 +91,18 @@ public class SavePointController {
     @Parameter(
         name = "savepointPath",
         description = "specified savepoint path",
-        schema = @Schema(implementation = String.class))
+        schema = @Schema(implementation = String.class)),
+    @Parameter(
+        name = "nativeFormat",
+        description = "use native format",
+        schema = @Schema(implementation = Boolean.class))
   })
   @ApiAccess
   @PostMapping("trigger")
   @RequiresPermissions("savepoint:trigger")
-  public RestResponse trigger(Long appId, @Nullable String savepointPath) {
-    savePointService.trigger(appId, savepointPath);
+  public RestResponse trigger(
+      Long appId, @Nullable String savepointPath, @Nullable Boolean 
nativeFormat) {
+    savePointService.trigger(appId, savepointPath, nativeFormat);
     return RestResponse.success(true);
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 7bbeffccb..edb56c6a0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -230,6 +230,8 @@ public class Application implements Serializable {
   private transient String savePoint;
   private transient Boolean savePointed = false;
   private transient Boolean drain = false;
+  private transient Boolean nativeFormat = false;
+  private transient Long savePointTimeout = 60L;
   private transient Boolean allowNonRestored = false;
   private transient Integer restoreMode;
   private transient String socketId;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
index c6b5479f5..684709dc7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
@@ -33,7 +33,7 @@ public interface SavePointService extends IService<SavePoint> 
{
 
   SavePoint getLatest(Long id);
 
-  void trigger(Long appId, @Nullable String savepointPath);
+  void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean 
nativeFormat);
 
   Boolean delete(Long id, Application application) throws InternalException;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3dcf471ce..76c5089b7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1299,6 +1299,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             appParam.getSavePointed(),
             appParam.getDrain(),
             customSavepoint,
+            appParam.getNativeFormat(),
             application.getK8sNamespace());
 
     final Date triggerTime = new Date();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 2ed5bb3b5..4622fbe41 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -165,7 +165,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
   }
 
   @Override
-  public void trigger(Long appId, @Nullable String savepointPath) {
+  public void trigger(Long appId, @Nullable String savepointPath, @Nullable 
Boolean nativeFormat) {
     log.info("Start to trigger savepoint for app {}", appId);
     Application application = applicationService.getById(appId);
 
@@ -187,7 +187,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
     // infer savepoint
     TriggerSavepointRequest request =
-        renderTriggerSavepointRequest(savepointPath, application, flinkEnv);
+        renderTriggerSavepointRequest(savepointPath, nativeFormat, 
application, flinkEnv);
 
     CompletableFuture<SavepointResponse> savepointFuture =
         CompletableFuture.supplyAsync(() -> 
FlinkClient.triggerSavepoint(request), executorService);
@@ -481,7 +481,10 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Nonnull
   private TriggerSavepointRequest renderTriggerSavepointRequest(
-      @Nullable String savepointPath, Application application, FlinkEnv 
flinkEnv) {
+      @Nullable String savepointPath,
+      Boolean nativeFormat,
+      Application application,
+      FlinkEnv flinkEnv) {
     String customSavepoint = this.getFinalSavepointDir(savepointPath, 
application);
 
     FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
@@ -496,6 +499,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
         clusterId,
         application.getJobId(),
         customSavepoint,
+        nativeFormat,
         application.getK8sNamespace());
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
index 09dd0b55c..9a58bd8b6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
@@ -186,6 +186,7 @@ public class AccessTokenController {
               .addFormData("id", appId)
               .addFormData("savePointed", "false")
               .addFormData("drain", "false")
+              .addFormData("nativeFormat", "false")
               .addFormData("savePoint", "")
               .build();
     }
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts 
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index 64b6f9a93..71778e6cb 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -119,6 +119,7 @@ export interface AppListRecord {
   savePoint?: any;
   savePointed: boolean;
   drain: boolean;
+  nativeFormat: boolean;
   allowNonRestored: boolean;
   socketId?: any;
   projectName?: any;
@@ -149,6 +150,7 @@ export interface CancelParam {
   id: string;
   savePointed: boolean;
   drain: boolean;
+  nativeFormat: boolean;
   savePoint: string;
 }
 // create Params
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index f8da5b674..7f47a2304 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -67,6 +67,18 @@
         afterItem: () => h('span', { class: 'tip-info' }, 'cancel job with 
savepoint path'),
         ifShow: ({ values }) => !!values.stopSavePointed,
       },
+      {
+        field: 'nativeFormat',
+        label: 'NativeFormat',
+        component: 'Switch',
+        componentProps: {
+          checkedChildren: 'ON',
+          unCheckedChildren: 'OFF',
+        },
+        defaultValue: false,
+        afterItem: () => h('span', { class: 'conf-switch' }, 'Note: native 
format savepoint is supported since flink 1.15'),
+        ifShow: ({ values }) => !!values.stopSavePointed,
+      },
       {
         field: 'drain',
         label: 'Drain',
@@ -89,12 +101,13 @@
   /* submit */
   async function handleSubmit() {
     try {
-      const { stopSavePointed, customSavepoint, drain } = (await validate()) 
as Recordable;
+      const { stopSavePointed, customSavepoint, drain, nativeFormat } = (await 
validate()) as Recordable;
       const stopReq = {
         id: app.id,
         savePointed: stopSavePointed,
         savePoint: customSavepoint,
         drain: drain,
+        nativeFormat: nativeFormat,
       };
 
       if (stopSavePointed) {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index 106c33cdc..a7ffb3994 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -17,7 +17,7 @@
 import Icon from '/@/components/Icon';
 import { useMessage } from '/@/hooks/web/useMessage';
 import { useI18n } from '/@/hooks/web/useI18n';
-import { Form, Input } from 'ant-design-vue';
+import { Form, Input, Switch} from 'ant-design-vue';
 import { fetchCheckSavepointPath } from '/@/api/flink/app/app';
 import { trigger } from '/@/api/flink/app/savepoint';
 import { ref, unref } from 'vue';
@@ -28,6 +28,7 @@ export const useSavepoint = (updateOption: Fn) => {
   const submitLoading = ref(false);
   const appId = ref('');
   const customSavepoint = ref('');
+  const nativeFormat = ref(false);
 
   async function handleSavepointAction(savepointTriggerReq: {
     appId: string | number;
@@ -74,6 +75,20 @@ export const useSavepoint = (updateOption: Fn) => {
                 onInput={(e) => (customSavepoint.value = e.target.value || '')}
               />
             </Form.Item>
+
+            <Form.Item
+              name="nativeFormat"
+              label="Native Format"
+              label-col={{ lg: { span: 7, offset: 0 }, sm: { span: 7, offset: 
0 } }}
+              wrapper-col={{ lg: { span: 16, offset: 0 }, sm: { span: 4, 
offset: 0 } }}
+            >
+              <Switch
+                checkedValue={true}
+                unCheckedValue={false}
+                checked={nativeFormat.value}
+                onClick={(checked, e) => (nativeFormat.value = checked || 
false)}
+              />
+            </Form.Item>
           </Form>
         );
       },
@@ -83,6 +98,7 @@ export const useSavepoint = (updateOption: Fn) => {
             const savepointReq = {
               appId: appId.value,
               savepointPath: unref(customSavepoint),
+              nativeFormat: unref(nativeFormat),
             };
             if (unref(customSavepoint)) {
               submitLoading.value = true;
@@ -118,6 +134,7 @@ export const useSavepoint = (updateOption: Fn) => {
       },
       onCancel: () => {
         customSavepoint.value = '';
+        nativeFormat.value = false;
       },
     });
   };
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index f11f561c9..dd5481dc4 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -33,5 +33,6 @@ case class CancelRequest(
     override val withSavepoint: Boolean,
     withDrain: Boolean,
     savepointPath: String,
+    nativeFormat: Boolean,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
index 11e3866cf..34bc21c61 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
@@ -38,6 +38,8 @@ trait SavepointRequestTrait {
 
   val savepointPath: String
 
+  val nativeFormat: Boolean
+
   val kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE
 
   @Nullable val properties: JavaMap[String, Any]
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 8f6344834..035c5348d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -32,5 +32,6 @@ case class TriggerSavepointRequest(
     clusterId: String,
     jobId: String,
     savepointPath: String,
+    nativeFormat: Boolean,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5c65da5ba..b75bc2b7a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -158,6 +158,7 @@ trait FlinkClientTrait extends Logger {
          |     flinkVersion   : ${savepointRequest.flinkVersion.version}
          |     clusterId      : ${savepointRequest.clusterId}
          |     savePointPath  : ${savepointRequest.savepointPath}
+         |     nativeFormat   : ${savepointRequest.nativeFormat}
          |     k8sNamespace   : ${savepointRequest.kubernetesNamespace}
          |     appId          : ${savepointRequest.clusterId}
          |     jobId          : ${savepointRequest.jobId}
@@ -178,6 +179,7 @@ trait FlinkClientTrait extends Logger {
          |     withSavePoint     : ${cancelRequest.withSavepoint}
          |     savePointPath     : ${cancelRequest.savepointPath}
          |     withDrain         : ${cancelRequest.withDrain}
+         |     nativeFormat      : ${cancelRequest.nativeFormat}
          |     k8sNamespace      : ${cancelRequest.kubernetesNamespace}
          |     appId             : ${cancelRequest.clusterId}
          |     jobId             : ${cancelRequest.jobId}
@@ -517,11 +519,15 @@ trait FlinkClientTrait extends Logger {
         null
       case (true, false) =>
         clientWrapper
-          .cancelWithSavepoint(jobID, savePointDir)
+          .cancelWithSavepoint(jobID, savePointDir, cancelRequest.nativeFormat)
           .get()
       case (_, _) =>
         clientWrapper
-          .stopWithSavepoint(jobID, cancelRequest.withDrain, savePointDir)
+          .stopWithSavepoint(
+            jobID,
+            cancelRequest.withDrain,
+            savePointDir,
+            cancelRequest.nativeFormat)
           .get()
     }
   }
@@ -559,7 +565,7 @@ trait FlinkClientTrait extends Logger {
       client: ClusterClient[_]): String = {
     val savepointPath = tryGetSavepointPathIfNeed(savepointRequest)
     val clientWrapper = new FlinkClusterClient(client)
-    clientWrapper.triggerSavepoint(jobID, savepointPath).get()
+    clientWrapper.triggerSavepoint(jobID, savepointPath, 
savepointRequest.nativeFormat).get()
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
index 6d8393ee8..2d149bdd5 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
@@ -24,15 +24,25 @@ import java.util.concurrent.CompletableFuture
 
 abstract class FlinkClientTrait[T](clusterClient: ClusterClient[T]) {
 
-  def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
+  def triggerSavepoint(
+      jobID: JobID,
+      savepointDir: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
     clusterClient.triggerSavepoint(jobID, savepointDir)
   }
 
-  def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] 
= {
+  def cancelWithSavepoint(
+      jobID: JobID,
+      s: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
     clusterClient.cancelWithSavepoint(jobID, s)
   }
 
-  def stopWithSavepoint(jobID: JobID, b: Boolean, s: String): 
CompletableFuture[String] =
+  def stopWithSavepoint(
+      jobID: JobID,
+      b: Boolean,
+      s: String,
+      nativeFormat: Boolean): CompletableFuture[String] =
     clusterClient.stopWithSavepoint(jobID, b, s)
 
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 4f6336f5a..c3a39ef14 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -25,25 +25,36 @@ import java.util.concurrent.CompletableFuture
 class FlinkClusterClient[T](clusterClient: ClusterClient[T])
   extends FlinkClientTrait[T](clusterClient) {
 
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  override def triggerSavepoint(
+      jobID: JobID,
+      savepointDir: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(
+      jobID,
+      savepointDir,
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
   override def cancelWithSavepoint(
       jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(
+      jobID,
+      savepointDirectory,
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
   override def stopWithSavepoint(
       jobID: JobID,
       advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
     clusterClient.stopWithSavepoint(
       jobID,
       advanceToEndOfEventTime,
       savepointDirectory,
-      SavepointFormatType.DEFAULT)
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 4f6336f5a..c3a39ef14 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -25,25 +25,36 @@ import java.util.concurrent.CompletableFuture
 class FlinkClusterClient[T](clusterClient: ClusterClient[T])
   extends FlinkClientTrait[T](clusterClient) {
 
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  override def triggerSavepoint(
+      jobID: JobID,
+      savepointDir: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(
+      jobID,
+      savepointDir,
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
   override def cancelWithSavepoint(
       jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(
+      jobID,
+      savepointDirectory,
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
   override def stopWithSavepoint(
       jobID: JobID,
       advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
     clusterClient.stopWithSavepoint(
       jobID,
       advanceToEndOfEventTime,
       savepointDirectory,
-      SavepointFormatType.DEFAULT)
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 4f6336f5a..c3a39ef14 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.17/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -25,25 +25,36 @@ import java.util.concurrent.CompletableFuture
 class FlinkClusterClient[T](clusterClient: ClusterClient[T])
   extends FlinkClientTrait[T](clusterClient) {
 
-  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
-    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  override def triggerSavepoint(
+      jobID: JobID,
+      savepointDir: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(
+      jobID,
+      savepointDir,
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
   override def cancelWithSavepoint(
       jobID: JobID,
-      savepointDirectory: String): CompletableFuture[String] = {
-    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(
+      jobID,
+      savepointDirectory,
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
   override def stopWithSavepoint(
       jobID: JobID,
       advanceToEndOfEventTime: Boolean,
-      savepointDirectory: String): CompletableFuture[String] = {
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
     clusterClient.stopWithSavepoint(
       jobID,
       advanceToEndOfEventTime,
       savepointDirectory,
-      SavepointFormatType.DEFAULT)
+      if (nativeFormat) SavepointFormatType.NATIVE else 
SavepointFormatType.CANONICAL)
   }
 
 }

Reply via email to