This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 880d8eac6 [Feature][Issue-2192] Support trigger savepoint manually. 
(#2268)
880d8eac6 is described below

commit 880d8eac6a7e3bd6d26f72503cfc0a188db9b9fb
Author: Pan Yuepeng <[email protected]>
AuthorDate: Thu Feb 23 22:52:45 2023 +0800

    [Feature][Issue-2192] Support trigger savepoint manually. (#2268)
    
    * [Feature][Issue-2192] Support trigger savepoint manually
    
    Change the sql ddl for t_flink_savepoint
    
    [Feature][Issue-2192] Support trigger savepoint manually
    
    Add columns info for savepoint & mapper
    
    [Feature][Issue-2192] Support trigger savepoint manually
    
    Add the triggerSavepoint interface;
    Adapt the new colums fill logic for corresponding code segment;
    Add the savepoint triggered by users store logic.
    
    [Feature][Issue-2192] Support trigger savepoint manually
    
    Add the savepoint icon-button and the corresponding logic about calling 
triggerSavepoint interface to backend.
    
    Fix code style.
    
    Update based on review comments.
    
    Move interface path;
    Standardise the service arrange;
    
    Update based on review comments.
    
    Adjust the corresponding parts
    
    Update based on review comments.
    
    Adjust the corresponding sql(interface auth in menu)
    
    Update based on review comments
    
    Remove the redundant fields of savepoint
    
    Keep mini-change rule.
    
    Updated based on review comments.
    
    Updated based on review comments.
    Change the AlertException to ApiAlertException.
    
    Updated based on review comments
    
    Add swagger annotations
    
    Introduce SavepointRequestTrait for TriggerSavepointRequest & CancelRequest
    
    Introduce SavepointResponse
    
    Introduce triggerSavePoint for FlinkClient
    
    Introduce triggerSavepoint for FlinkClientTrait
    
    Introduce triggerSavepoint for Submit
    
    Introduce triggerSavepoint for FlinkClient
    
    Adapt CheckpointProcessor for processing savepoint
    
    Adapt the calling for SavePointService
    
    Adapt the savepoint process for app cancel
    
    Address benjobs's review comments
    
    Optimise the savepoint/checkpoint store logic into checkpoint processor.
    
    Address fanrui's comments
    
    Change the savepoint cache key format
    
    * Address ben's comments
    
    * Address rui.fan's comments
    
    * Fix the execution config item bug when calling in k8s-session 
execution-mode.
    
    * Update based on comments
    
    * Address rui.fan's comments
    
    * Address benjob's comments
    
    * Add the commnet lines to clarify the reason of the change.
---
 .../src/main/assembly/script/data/mysql-data.sql   |   1 +
 .../src/main/assembly/script/data/pgsql-data.sql   |   1 +
 .../main/assembly/script/schema/mysql-schema.sql   |   2 +-
 .../main/assembly/script/schema/pgsql-schema.sql   |   2 +-
 .../main/assembly/script/upgrade/mysql/2.1.0.sql   |  17 +-
 .../main/assembly/script/upgrade/pgsql/2.1.0.sql   |  17 +-
 .../core/controller/SavePointController.java       |  28 +++
 .../console/core/metrics/flink/CheckPoints.java    |  25 +++
 .../console/core/service/SavePointService.java     |   6 +
 .../core/service/impl/ApplicationServiceImpl.java  |  65 +-----
 .../core/service/impl/SavePointServiceImpl.java    | 227 +++++++++++++++++++++
 .../console/core/task/CheckpointProcessor.java     | 118 +++++++++--
 .../core/task/FlinkK8sChangeEventListener.java     |   2 +-
 .../console/core/task/FlinkRESTAPIWatcher.java     |   2 +-
 .../src/main/resources/db/data-h2.sql              |   1 +
 .../src/main/resources/db/schema-h2.sql            |   2 +-
 .../src/api/flink/app/savepoint.ts                 |   9 +
 .../src/locales/lang/en/flink/app.ts               |   2 +
 .../src/locales/lang/zh-CN/flink/app.ts            |   2 +
 .../src/views/flink/app/View.vue                   |  12 +-
 .../AppView/SavepointApplicationModal.vue          | 127 ++++++++++++
 .../src/views/flink/app/hooks/useApp.tsx           |   1 +
 .../src/views/flink/app/hooks/useAppTableAction.ts |  19 +-
 .../streampark/flink/client/FlinkClient.scala      |  18 ++
 .../flink/client/bean/CancelRequest.scala          |   8 +-
 ...elRequest.scala => SavepointRequestTrait.scala} |  32 ++-
 ...CancelRequest.scala => SavepointResponse.scala} |  18 +-
 ...Request.scala => TriggerSavepointRequest.scala} |  22 +-
 .../flink/client/FlinkClientHandler.scala          |  13 ++
 .../impl/KubernetesNativeApplicationSubmit.scala   |   4 +
 .../impl/KubernetesNativeSessionSubmit.scala       |   7 +-
 .../streampark/flink/client/impl/LocalSubmit.scala |   4 +
 .../flink/client/impl/RemoteSubmit.scala           |  48 +++--
 .../flink/client/impl/YarnSessionSubmit.scala      |  26 ++-
 .../flink/client/trait/FlinkSubmitTrait.scala      |  72 +++++--
 .../client/trait/KubernetesNativeSubmitTrait.scala |  35 +++-
 .../flink/client/trait/YarnSubmitTrait.scala       |  29 ++-
 .../streampark/flink/core/FlinkClientTrait.scala   |   4 +
 .../streampark/flink/core/FlinkClusterClient.scala |   4 +
 .../streampark/flink/core/FlinkClusterClient.scala |   4 +
 40 files changed, 820 insertions(+), 216 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index a613a706f..6b2265ac3 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -117,6 +117,7 @@ insert into `t_menu` values (100066, 100015, 'view', null, 
null, 'app:view', nul
 insert into `t_menu` values (100067, 100053, 'view', NULL, NULL, 
'variable:view', NULL, '1', 1, null, now(), now());
 insert into `t_menu` values (100068, 100033, 'view', null, null, 
'setting:view', null, '1', 1, null, now(), now());
 insert into `t_menu` values (100069, 100053, 'depend view', null, null, 
'variable:depend_apps', null, '1', 1, NULL, now(), now());
+insert into `t_menu` values (100070, 100015, 'savepoint trigger', null, null, 
'savepoint:trigger', null, '1', 1, null, now(), now());
 
 -- ----------------------------
 -- Records of t_role
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
index fff55a58a..ad68d03b0 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
@@ -113,6 +113,7 @@ insert into "public"."t_menu" values (100066, 100015, 
'view', null, null, 'app:v
 insert into "public"."t_menu" values (100067, 100053, 'view', NULL, NULL, 
'variable:view', NULL, '1', '1', null, now(), now());
 insert into "public"."t_menu" values (100068, 100033, 'view', null, null, 
'setting:view', null, '1', '1', null, now(), now());
 insert into "public"."t_menu" values (100069, 100053, 'depend view', null, 
null, 'variable:depend_apps', null, '1', '1', NULL, now(), now());
+insert into "public"."t_menu" values (100070, 100015, 'savepoint trigger', 
null, null, 'savepoint:trigger', null, '1', '1', null, now(), now());
 
 -- ----------------------------
 -- Records of t_role
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index d1eaa6f0f..d0b3c8b6e 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -212,7 +212,7 @@ create table `t_flink_savepoint` (
   `app_id` bigint not null,
   `chk_id` bigint default null,
   `type` tinyint default null,
-  `path` varchar(255) collate utf8mb4_general_ci default null,
+  `path` varchar(1024) collate utf8mb4_general_ci default null,
   `latest` tinyint not null default 1,
   `trigger_time` datetime default null,
   `create_time` datetime not null default current_timestamp comment 'create 
time',
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 65dfc108b..997007c47 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -453,7 +453,7 @@ create table "public"."t_flink_savepoint" (
   "app_id" int8 not null,
   "chk_id" int8,
   "type" int2,
-  "path" varchar(255) collate "pg_catalog"."default",
+  "path" varchar(1024) collate "pg_catalog"."default",
   "latest" boolean not null default true,
   "trigger_time" timestamp(6),
   "create_time" timestamp(6) not null default timezone('UTC-8'::text, 
(now())::timestamp(0) without time zone)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
similarity index 60%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
copy to 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
index 23e89e6f3..b1f8c2e2a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql
@@ -15,19 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+-- ISSUE-2192 DDL & DML Start
 
-import java.util.concurrent.CompletableFuture
+alter table `t_flink_savepoint` modify column `path`  varchar(1024) collate 
utf8mb4_general_ci default null;
 
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
+insert into `t_menu` values (100070, 100015, 'savepoint trigger', null, null, 
'savepoint:trigger', null, '1', 1, null, now(), now());
 
-abstract class FlinkClientTrait[T](clusterClient: ClusterClient[T]) {
-
-  def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] 
= {
-    clusterClient.cancelWithSavepoint(jobID, s)
-  }
-
-  def stopWithSavepoint(jobID: JobID, b: Boolean, s: String): 
CompletableFuture[String] = clusterClient.stopWithSavepoint(jobID, b, s)
-
-}
+-- ISSUE-2192 DDL & DML End
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.0.sql
similarity index 60%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
copy to 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.0.sql
index 23e89e6f3..81cc45bad 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.0.sql
@@ -15,19 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.core
+-- ISSUE-2192 DDL & DML Start
 
-import java.util.concurrent.CompletableFuture
+alter table "public"."t_flink_savepoint" alter column "path" type 
varchar(1024) collate "pg_catalog"."default";
 
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.program.ClusterClient
+insert into "public"."t_menu" values (100070, 100015, 'savepoint trigger', 
null, null, 'savepoint:trigger', null, '1', '1', null, now(), now());
 
-abstract class FlinkClientTrait[T](clusterClient: ClusterClient[T]) {
-
-  def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] 
= {
-    clusterClient.cancelWithSavepoint(jobID, s)
-  }
-
-  def stopWithSavepoint(jobID: JobID, b: Boolean, s: String): 
CompletableFuture[String] = clusterClient.stopWithSavepoint(jobID, b, s)
-
-}
+-- ISSUE-2192 DDL & DML End
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
index 4b7c1f254..82013f468 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
@@ -20,12 +20,16 @@ package org.apache.streampark.console.core.controller;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.InternalException;
+import org.apache.streampark.console.core.annotation.ApiAccess;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.SavePoint;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.SavePointService;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +38,8 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Nullable;
+
 @Slf4j
 @Validated
 @RestController
@@ -64,4 +70,26 @@ public class SavePointController {
     Boolean deleted = savePointService.delete(id, application);
     return RestResponse.success(deleted);
   }
+
+  @ApiAccess
+  @ApiOperation(value = "Trigger savepoint for specified application by id.")
+  @ApiImplicitParams({
+    @ApiImplicitParam(
+        name = "appId",
+        value = "application id",
+        required = true,
+        paramType = "query",
+        dataTypeClass = Long.class),
+    @ApiImplicitParam(
+        name = "savepointPath",
+        value = "specified savepoint path",
+        paramType = "query",
+        dataTypeClass = String.class)
+  })
+  @PostMapping("trigger")
+  @RequiresPermissions("savepoint:trigger")
+  public RestResponse trigger(Long appId, @Nullable String savepointPath) {
+    savePointService.trigger(appId, savepointPath);
+    return RestResponse.success(true);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
index 4b2dbba8f..b92fc4059 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
@@ -20,11 +20,15 @@ package org.apache.streampark.console.core.metrics.flink;
 import org.apache.streampark.console.core.enums.CheckPointStatus;
 import org.apache.streampark.console.core.enums.CheckPointType;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 @Data
 public class CheckPoints implements Serializable {
@@ -33,6 +37,14 @@ public class CheckPoints implements Serializable {
 
   private Latest latest;
 
+  @JsonIgnore
+  public List<CheckPoint> getLatestCheckpoint() {
+    if (Objects.isNull(latest)) {
+      return Collections.emptyList();
+    }
+    return latest.getLatestCheckpoint();
+  }
+
   @Data
   public static class CheckPoint implements Serializable {
     private Long id;
@@ -82,5 +94,18 @@ public class CheckPoints implements Serializable {
   @Data
   public static class Latest implements Serializable {
     private CheckPoint completed;
+    private CheckPoint savepoint;
+
+    @JsonIgnore
+    public List<CheckPoint> getLatestCheckpoint() {
+      List<CheckPoint> checkPoints = new ArrayList<>();
+      if (Objects.nonNull(completed)) {
+        checkPoints.add(completed);
+      }
+      if (Objects.nonNull(savepoint)) {
+        checkPoints.add(savepoint);
+      }
+      return checkPoints;
+    }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
index 5ffdf2dca..c6b5479f5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
@@ -25,15 +25,21 @@ import org.apache.streampark.console.core.entity.SavePoint;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 
+import javax.annotation.Nullable;
+
 public interface SavePointService extends IService<SavePoint> {
 
   void expire(Long appId);
 
   SavePoint getLatest(Long id);
 
+  void trigger(Long appId, @Nullable String savepointPath);
+
   Boolean delete(Long id, Application application) throws InternalException;
 
   IPage<SavePoint> page(SavePoint savePoint, RestRequest request);
 
   void removeApp(Application application);
+
+  String getSavePointPath(Application app) throws Exception;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 76117a323..aaf9ce5ad 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -27,7 +27,6 @@ import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.util.FlinkUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
@@ -105,7 +104,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -1197,7 +1195,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (appParam.getSavePointed()) {
       customSavepoint = appParam.getSavePoint();
       if (StringUtils.isBlank(customSavepoint)) {
-        customSavepoint = getSavePointPath(appParam);
+        customSavepoint = savePointService.getSavePointPath(appParam);
       }
     }
 
@@ -1246,6 +1244,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.getK8sNamespace(),
             properties);
 
+    final Date triggerTime = new Date();
     CompletableFuture<CancelResponse> cancelFuture =
         CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), 
executorService);
 
@@ -1260,13 +1259,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 String savePointDir = cancelResponse.savePointDir();
                 log.info("savePoint path: {}", savePointDir);
                 SavePoint savePoint = new SavePoint();
-                Date now = new Date();
                 savePoint.setPath(savePointDir);
                 savePoint.setAppId(application.getId());
                 savePoint.setLatest(true);
                 savePoint.setType(CheckPointType.SAVEPOINT.get());
-                savePoint.setTriggerTime(now);
-                savePoint.setCreateTime(now);
+                savePoint.setCreateTime(new Date());
+                savePoint.setTriggerTime(triggerTime);
                 savePointService.save(savePoint);
               }
               if (isKubernetesApp(application)) {
@@ -1319,7 +1317,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   public String checkSavepointPath(Application appParam) throws Exception {
     String savepointPath = appParam.getSavePoint();
     if (StringUtils.isBlank(savepointPath)) {
-      savepointPath = getSavePointPath(appParam);
+      savepointPath = savePointService.getSavePointPath(appParam);
     }
 
     if (StringUtils.isNotBlank(savepointPath)) {
@@ -1707,59 +1705,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return false;
   }
 
-  private String getSavePointPath(Application appParam) throws Exception {
-    Application application = getById(appParam.getId());
-
-    // 1) properties have the highest priority, read the properties are set: 
-Dstate.savepoints.dir
-    String savepointPath =
-        
FlinkClient.extractDynamicPropertiesAsJava(application.getDynamicProperties())
-            .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-
-    // Application conf configuration has the second priority. If it is a 
streampark|flinksql type
-    // task,
-    // see if Application conf is configured when the task is defined, if 
checkpoints are configured
-    // and enabled,
-    // read `state.savepoints.dir`
-    if (StringUtils.isBlank(savepointPath)) {
-      if (application.isStreamParkJob() || application.isFlinkSqlJob()) {
-        ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
-        if (applicationConfig != null) {
-          Map<String, String> map = applicationConfig.readConfig();
-          if (FlinkUtils.isCheckpointEnabled(map)) {
-            savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-          }
-        }
-      }
-    }
-
-    // 3) If the savepoint is not obtained above, try to obtain the savepoint 
path according to the
-    // deployment type (remote|on yarn)
-    if (StringUtils.isBlank(savepointPath)) {
-      // 3.1) At the remote mode, request the flink webui interface to get the 
savepoint path
-      if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
-        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        Utils.notNull(
-            cluster,
-            String.format(
-                "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or "
-                    + "the cluster has been deleted. Please contact the 
Admin.",
-                application.getFlinkClusterId()));
-        Map<String, String> config = cluster.getFlinkConfig();
-        if (!config.isEmpty()) {
-          savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-        }
-      } else {
-        // 3.2) At the yarn or k8s mode, then read the savepoint in 
flink-conf.yml in the bound
-        // flink
-        FlinkEnv flinkEnv = 
flinkEnvService.getById(application.getVersionId());
-        savepointPath =
-            
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-      }
-    }
-
-    return savepointPath;
-  }
-
   private String getSavePointed(Application appParam) {
     if (appParam.getSavePointed()) {
       if (appParam.getSavePoint() == null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 1198a62db..a802e173e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -17,33 +17,64 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.CompletableFutureUtils;
+import org.apache.streampark.common.util.FlinkUtils;
+import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.Constant;
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationConfig;
+import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.SavePoint;
 import org.apache.streampark.console.core.enums.CheckPointType;
+import org.apache.streampark.console.core.enums.OptionState;
 import org.apache.streampark.console.core.mapper.SavePointMapper;
+import org.apache.streampark.console.core.service.ApplicationConfigService;
+import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
+import org.apache.streampark.flink.client.bean.SavepointResponse;
+import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.jetbrains.annotations.NotNull;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Nullable;
+
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
@@ -54,6 +85,24 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private ApplicationService applicationService;
 
+  @Autowired private ApplicationConfigService configService;
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  @Autowired private ApplicationLogService applicationLogService;
+
+  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+
+  private final ExecutorService executorService =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("trigger-savepoint-executor"),
+          new ThreadPoolExecutor.AbortPolicy());
+
   @Override
   public void expire(Long appId) {
     SavePoint savePoint = new SavePoint();
@@ -171,6 +220,184 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     return this.getOne(queryWrapper);
   }
 
+  @Override
+  public String getSavePointPath(Application appParam) throws Exception {
+    Application application = applicationService.getById(appParam.getId());
+
+    // 1) properties have the highest priority, read the properties are set: 
-Dstate.savepoints.dir
+    String savepointPath =
+        
FlinkClient.extractDynamicPropertiesAsJava(application.getDynamicProperties())
+            .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+
+    // Application conf configuration has the second priority. If it is a 
streampark|flinksql type
+    // task,
+    // see if Application conf is configured when the task is defined, if 
checkpoints are configured
+    // and enabled,
+    // read `state.savepoints.dir`
+    if (StringUtils.isBlank(savepointPath)) {
+      if (application.isStreamParkJob() || application.isFlinkSqlJob()) {
+        ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
+        if (applicationConfig != null) {
+          Map<String, String> map = applicationConfig.readConfig();
+          if (FlinkUtils.isCheckpointEnabled(map)) {
+            savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+          }
+        }
+      }
+    }
+
+    // 3) If the savepoint is not obtained above, try to obtain the savepoint 
path according to the
+    // deployment type (remote|on yarn)
+    if (StringUtils.isBlank(savepointPath)) {
+      // 3.1) At the remote mode, request the flink webui interface to get the 
savepoint path
+      if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
+        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+        Utils.notNull(
+            cluster,
+            String.format(
+                "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or "
+                    + "the cluster has been deleted. Please contact the 
Admin.",
+                application.getFlinkClusterId()));
+        Map<String, String> config = cluster.getFlinkConfig();
+        if (!config.isEmpty()) {
+          savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+        }
+      } else {
+        // 3.2) At the yarn or k8s mode, then read the savepoint in 
flink-conf.yml in the bound
+        // flink
+        FlinkEnv flinkEnv = 
flinkEnvService.getById(application.getVersionId());
+        savepointPath =
+            
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+      }
+    }
+
+    return savepointPath;
+  }
+
+  @Override
+  public void trigger(Long appId, @Nullable String savepointPath) {
+    log.info("Start to trigger savepoint for app {}", appId);
+    Application application = applicationService.getById(appId);
+
+    FlinkRESTAPIWatcher.addSavepoint(application.getId());
+
+    application.setOptionState(OptionState.SAVEPOINTING.getValue());
+    application.setOptionTime(new Date());
+    this.applicationService.updateById(application);
+    flinkRESTAPIWatcher.init();
+
+    FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+
+    // infer savepoint
+    String customSavepoint = this.getFinalSavepointDir(savepointPath, 
application);
+
+    FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+    String clusterId = getClusterId(application, cluster);
+
+    Map<String, Object> properties = this.tryGetRestProps(application, 
cluster);
+
+    TriggerSavepointRequest request =
+        new TriggerSavepointRequest(
+            flinkEnv.getFlinkVersion(),
+            application.getExecutionModeEnum(),
+            clusterId,
+            application.getJobId(),
+            customSavepoint,
+            application.getK8sNamespace(),
+            properties);
+
+    CompletableFuture<SavepointResponse> savepointFuture =
+        CompletableFuture.supplyAsync(() -> 
FlinkClient.triggerSavepoint(request), executorService);
+
+    handleSavepointResponseFuture(application, savepointFuture);
+  }
+
+  private void handleSavepointResponseFuture(
+      Application application, CompletableFuture<SavepointResponse> 
savepointFuture) {
+    CompletableFutureUtils.runTimeout(
+            savepointFuture,
+            10L,
+            TimeUnit.MINUTES,
+            savepointResponse -> {
+              if (savepointResponse != null && 
savepointResponse.savePointDir() != null) {
+                String savePointDir = savepointResponse.savePointDir();
+                log.info("Request savepoint successful, savepointDir: {}", 
savePointDir);
+              }
+            },
+            e -> {
+              log.error("Trigger savepoint for flink job failed.", e);
+              ApplicationLog log = new ApplicationLog();
+              log.setAppId(application.getId());
+              if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+                log.setYarnAppId(application.getClusterId());
+              }
+              log.setOptionTime(new Date());
+              String exception = Utils.stringifyException(e);
+              log.setException(exception);
+              if (!(e instanceof TimeoutException)) {
+                log.setSuccess(false);
+              }
+              applicationLogService.save(log);
+            })
+        .whenComplete(
+            (t, e) -> {
+              application.setOptionState(OptionState.NONE.getValue());
+              application.setOptionTime(new Date());
+              applicationService.update(application);
+              flinkRESTAPIWatcher.init();
+            });
+  }
+
+  private String getFinalSavepointDir(@Nullable String savepointPath, 
Application application) {
+    String result = savepointPath;
+    if (StringUtils.isEmpty(savepointPath)) {
+      try {
+        result = this.getSavePointPath(application);
+      } catch (Exception e) {
+        throw new ApiAlertException(
+            "Error in getting savepoint path for triggering savepoint for app "
+                + application.getId(),
+            e);
+      }
+    }
+    return result;
+  }
+
+  @NotNull
+  private Map<String, Object> tryGetRestProps(Application application, 
FlinkCluster cluster) {
+    Map<String, Object> properties = new HashMap<>();
+
+    if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
+      Utils.notNull(
+          cluster,
+          String.format(
+              "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or the cluster has been deleted. Please contact the Admin.",
+              application.getFlinkClusterId()));
+      URI activeAddress = cluster.getRemoteURI();
+      properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+      properties.put(RestOptions.PORT.key(), activeAddress.getPort());
+    }
+    return properties;
+  }
+
+  private String getClusterId(Application application, FlinkCluster cluster) {
+    if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
+      return application.getClusterId();
+    } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+      if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+        Utils.notNull(
+            cluster,
+            String.format(
+                "The yarn session clusterId=%s cannot be find, maybe the 
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
+                application.getFlinkClusterId()));
+        return cluster.getClusterId();
+      } else {
+        return application.getAppId();
+      }
+    }
+    return null;
+  }
+
   @Override
   @Transactional(rollbackFor = Exception.class)
   public Boolean delete(Long id, Application application) throws 
InternalException {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 561a689c0..fd6fd5fed 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -28,9 +28,13 @@ import 
org.apache.streampark.console.core.service.alert.AlertService;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Data;
+import org.jetbrains.annotations.Nullable;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Nonnull;
+
 import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
@@ -41,9 +45,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Component
 public class CheckpointProcessor {
 
+  private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
+  private static final Integer SAVEPOINT_CACHE_HOUR = 1;
+
   private final Cache<String, Long> checkPointCache =
       Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
 
+  /**
+   * Cache to store the savepoint if be stored in the db. Use the 
{appId}_{jobID}_{chkId} from
+   * {@link CheckPointKey#getSavePointId()} as the cache key to save the trace 
of the savepoint. And
+   * try best to make sure the every savepoint would be stored into DB. 
Especially for the case
+   * 'maxConcurrent of Checkpoint' > 1: 1. savepoint(n-1) is completed after 
completed
+   * checkpoint(n); 2. savepoint(n-1) is completed after completed 
savepoint(n).
+   */
+  private final Cache<String, Byte> savepointedCache =
+      Caffeine.newBuilder().expireAfterWrite(SAVEPOINT_CACHE_HOUR, 
TimeUnit.HOURS).build();
+
   private final Map<Long, Counter> checkPointFailedCache = new 
ConcurrentHashMap<>(0);
 
   @Autowired private ApplicationService applicationService;
@@ -52,30 +69,29 @@ public class CheckpointProcessor {
 
   @Autowired private SavePointService savePointService;
 
-  public void process(Long appId, CheckPoints checkPoints) {
-    CheckPoints.Latest latest = checkPoints.getLatest();
-    if (latest == null || latest.getCompleted() == null) {
-      return;
-    }
-    CheckPoints.CheckPoint checkPoint = latest.getCompleted();
-    Application application = applicationService.getById(appId);
+  public void process(Application application, @Nonnull CheckPoints 
checkPoints) {
+    checkPoints.getLatestCheckpoint().forEach(checkPoint -> 
process(application, checkPoint));
+  }
+
+  private void process(Application application, @Nonnull 
CheckPoints.CheckPoint checkPoint) {
+    String jobID = application.getJobId();
+    Long appId = application.getId();
     CheckPointStatus status = checkPoint.getCheckPointStatus();
+    CheckPointKey checkPointKey = new CheckPointKey(appId, jobID, 
checkPoint.getId());
 
     if (CheckPointStatus.COMPLETED.equals(status)) {
-      String cacheId = appId + "_" + application.getJobId();
-      Long latestId =
-          checkPointCache.get(
-              cacheId,
-              key -> {
-                SavePoint savePoint = savePointService.getLatest(appId);
-                return 
Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
-              });
-
-      if (latestId == null || latestId < checkPoint.getId()) {
-        saveSavepoint(checkPoint, application);
-        checkPointCache.put(cacheId, checkPoint.getId());
+      if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
+        savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
+        saveSavepoint(checkPoint, application.getId());
+        return;
       }
-    } else if (CheckPointStatus.FAILED.equals(status) && 
application.cpFailedTrigger()) {
+
+      Long latestChkId = getLatestCheckpointedId(appId, 
checkPointKey.getCheckPointId());
+      if (shouldStoreAsCheckpoint(checkPoint, latestChkId)) {
+        checkPointCache.put(checkPointKey.getCheckPointId(), 
checkPoint.getId());
+        saveSavepoint(checkPoint, application.getId());
+      }
+    } else if (shouldProcessFailedTrigger(checkPoint, 
application.cpFailedTrigger(), status)) {
       Counter counter = checkPointFailedCache.get(appId);
       if (counter == null) {
         checkPointFailedCache.put(appId, new 
Counter(checkPoint.getTriggerTimestamp()));
@@ -108,9 +124,43 @@ public class CheckpointProcessor {
     }
   }
 
-  private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Application 
application) {
+  private static boolean shouldStoreAsCheckpoint(
+      @Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) {
+    return !checkPoint.getIsSavepoint() && (latestId == null || latestId < 
checkPoint.getId());
+  }
+
+  private boolean shouldStoreAsSavepoint(
+      CheckPointKey checkPointKey, @Nonnull CheckPoints.CheckPoint checkPoint) 
{
+    if (!checkPoint.getIsSavepoint()) {
+      return false;
+    }
+    return savepointedCache.getIfPresent(checkPointKey.getSavePointId()) == 
null
+        // If the savepoint triggered before SAVEPOINT_CACHE_HOUR span, we'll 
see it as out-of-time
+        // savepoint and ignore it.
+        && checkPoint.getTriggerTimestamp()
+            >= System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(SAVEPOINT_CACHE_HOUR);
+  }
+
+  @Nullable
+  private Long getLatestCheckpointedId(Long appId, String cacheId) {
+    return checkPointCache.get(
+        cacheId,
+        key -> {
+          SavePoint savePoint = savePointService.getLatest(appId);
+          return 
Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
+        });
+  }
+
+  private boolean shouldProcessFailedTrigger(
+      CheckPoints.CheckPoint checkPoint, boolean cpFailedTrigger, 
CheckPointStatus status) {
+    return CheckPointStatus.FAILED.equals(status)
+        && !checkPoint.getIsSavepoint()
+        && cpFailedTrigger;
+  }
+
+  private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
     SavePoint savePoint = new SavePoint();
-    savePoint.setAppId(application.getId());
+    savePoint.setAppId(appId);
     savePoint.setChkId(checkPoint.getId());
     savePoint.setLatest(true);
     savePoint.setType(checkPoint.getCheckPointType().get());
@@ -141,4 +191,28 @@ public class CheckpointProcessor {
       return (currentTimestamp - this.timestamp) / 1000 / 60;
     }
   }
+
+  /** Util class for checkpoint key. */
+  @Data
+  public static class CheckPointKey {
+    private Long appId;
+    private String jobId;
+    private Long checkId;
+
+    public CheckPointKey(Long appId, String jobId, Long checkId) {
+      this.appId = appId;
+      this.jobId = jobId;
+      this.checkId = checkId;
+    }
+
+    /** Get savepoint cache id, see {@link #savepointedCache}. */
+    public String getSavePointId() {
+      return String.format("%s_%s_%s", appId, jobId, checkId);
+    }
+
+    /** Get checkpoint cache id. */
+    public String getCheckPointId() {
+      return String.format("%s_%s", appId, jobId);
+    }
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 4f1734a44..cedb776f4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -146,7 +146,7 @@ public class FlinkK8sChangeEventListener {
     CheckPoints checkPoint = new CheckPoints();
     checkPoint.setLatest(latest);
 
-    checkpointProcessor.process(event.trackId().appId(), checkPoint);
+    
checkpointProcessor.process(applicationService.getById(event.trackId().appId()),
 checkPoint);
   }
 
   private void setByJobStatusCV(Application app, JobStatusCV jobStatus) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index 6d82c2316..28649ae9e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -354,7 +354,7 @@ public class FlinkRESTAPIWatcher {
     FlinkCluster flinkCluster = getFlinkCluster(application);
     CheckPoints checkPoints = httpCheckpoints(application, flinkCluster);
     if (checkPoints != null) {
-      checkpointProcessor.process(application.getId(), checkPoints);
+      checkpointProcessor.process(application, checkPoints);
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 9bfbc2d94..2d4097984 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -112,6 +112,7 @@ insert into `t_menu` values (100067, 100015, 'view', null, 
null, 'app:view', nul
 insert into `t_menu` values (100068, 100054, 'view', NULL, NULL, 
'variable:view', NULL, '1', 1, null, now(), now());
 insert into `t_menu` values (100069, 100034, 'view', null, null, 
'setting:view', null, '1', 1, null, now(), now());
 insert into `t_menu` values (100070, 100054, 'depend view', null, null, 
'variable:depend_apps', null, '1', 1, NULL, now(), now());
+insert into `t_menu` values (100071, 100015, 'savepoint trigger', null, null, 
'savepoint:trigger', null, '1', 1, null, now(), now());
 
 -- ----------------------------
 -- Records of t_role
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index a305db5dc..e906eed35 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -190,7 +190,7 @@ create table if not exists `t_flink_savepoint` (
   `app_id` bigint not null,
   `chk_id` bigint default null,
   `type` tinyint default null,
-  `path` varchar(255)  default null,
+  `path` varchar(1024)  default null,
   `latest` tinyint not null default 1,
   `trigger_time` datetime default null,
   `create_time` datetime not null default current_timestamp comment 'create 
time',
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/app/savepoint.ts 
b/streampark-console/streampark-console-webapp/src/api/flink/app/savepoint.ts
index 3530c055b..0a758ed1c 100644
--- 
a/streampark-console/streampark-console-webapp/src/api/flink/app/savepoint.ts
+++ 
b/streampark-console/streampark-console-webapp/src/api/flink/app/savepoint.ts
@@ -20,6 +20,7 @@ enum SAVE_POINT_API {
   LATEST = '/flink/savepoint/latest',
   HISTORY = '/flink/savepoint/history',
   DELETE = '/flink/savepoint/delete',
+  TRIGGER = '/flink/savepoint/trigger',
 }
 
 export function fetchLatest(data: Recordable) {
@@ -39,3 +40,11 @@ export function fetchRemoveSavePoint(data: { id: string }): 
Promise<boolean> {
     data,
   });
 }
+
+/**
+ * Trigger a savepoint manually.
+ * @param data app id & optional savepoint path.
+ */
+export function trigger(data: { appId: string | number,  savepointPath: string 
| null}) {
+  return defHttp.post({ url: SAVE_POINT_API.TRIGGER, data });
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index a538bd8f4..8f909eaa1 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -116,6 +116,7 @@ export default {
     refresh: 'refresh',
     start: 'Start Application',
     stop: 'Stop application',
+    savepoint: 'Trigger Savepoint',
     recheck: 'the associated project has changed and this job need to be 
rechecked',
     changed: 'the application has changed.',
   },
@@ -150,6 +151,7 @@ export default {
     launchDetail: 'Launching Progress Detail',
     start: 'Start Application',
     cancel: 'Cancel Application',
+    savepoint: 'Trigger Savepoint',
     detail: 'View Application Detail',
     startLog: 'See Flink Start log',
     force: 'Forced Stop Application',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 48aa62338..e30597018 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -114,6 +114,7 @@ export default {
     refresh: '刷新',
     start: '开启应用',
     stop: '停止应用',
+    savepoint: '触发 Savepoint',
     recheck: '关联的项目已更改,需要重新检查此作业',
     changed: '应用程序已更改。',
   },
@@ -148,6 +149,7 @@ export default {
     launchDetail: '发布详情',
     start: '启动作业',
     cancel: '取消作业',
+    savepoint: '触发 Savepoint',
     detail: '查看作业详情',
     startLog: '查看 Flink 启动日志',
     force: '强制停止作业',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
index 6dc6e0a41..858e7824c 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
@@ -36,6 +36,7 @@
   import { useDrawer } from '/@/components/Drawer';
   import { useModal } from '/@/components/Modal';
 
+  import SavepointApplicationModal from 
"./components/AppView/SavepointApplicationModal.vue";
   import StartApplicationModal from 
'./components/AppView/StartApplicationModal.vue';
   import StopApplicationModal from 
'./components/AppView/StopApplicationModal.vue';
   import LogModal from './components/AppView/LogModal.vue';
@@ -48,6 +49,7 @@
     starting: new Map(),
     stopping: new Map(),
     launch: new Map(),
+    savepointing: new Map(),
   };
 
   const appDashboardRef = ref<any>();
@@ -57,6 +59,7 @@
 
   const [registerStartModal, { openModal: openStartModal }] = useModal();
   const [registerStopModal, { openModal: openStopModal }] = useModal();
+  const [registerSavepointModal, { openModal: openSavepointModal }] = 
useModal();
   const [registerLogModal, { openModal: openLogModal }] = useModal();
   const [registerBuildDrawer, { openDrawer: openBuildDrawer }] = useDrawer();
 
@@ -103,6 +106,11 @@
               optionApps.launch.delete(x.id);
             }
           }
+          if (optionApps.savepointing.get(x.id)) {
+            if (timestamp - optionApps.savepointing.get(x.id) > 2000) {
+              optionApps.savepointing.delete(x.id);
+            }
+          }
         }
       });
       return dataSource;
@@ -121,6 +129,7 @@
   const { getTableActions, getActionDropdown, formConfig } = useAppTableAction(
     openStartModal,
     openStopModal,
+    openSavepointModal,
     openLogModal,
     openBuildDrawer,
     handlePageDataReload,
@@ -146,7 +155,7 @@
 
   /* Update options data */
   function handleOptionApp(data: {
-    type: 'starting' | 'stopping' | 'launch';
+    type: 'starting' | 'stopping' | 'launch' | 'savepointing';
     key: any;
     value: any;
   }) {
@@ -253,6 +262,7 @@
     </BasicTable>
     <StartApplicationModal @register="registerStartModal" 
@update-option="handleOptionApp" />
     <StopApplicationModal @register="registerStopModal" 
@update-option="handleOptionApp" />
+    <SavepointApplicationModal @register="registerSavepointModal" 
@update-option="handleOptionApp" />
     <LogModal @register="registerLogModal" />
     <BuildDrawer @register="registerBuildDrawer" />
   </PageWrapper>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/SavepointApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/SavepointApplicationModal.vue
new file mode 100644
index 000000000..0e06d4bc7
--- /dev/null
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/SavepointApplicationModal.vue
@@ -0,0 +1,127 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<script lang="ts">
+  import { reactive, defineComponent } from 'vue';
+  import { useI18n } from '/@/hooks/web/useI18n';
+  export default defineComponent({
+    name: 'SavepointApplicationModal',
+  });
+</script>
+
+<script setup lang="ts" name="SavepointApplicationModal">
+  import { BasicForm, useForm } from '/@/components/Form';
+  import { BasicModal, useModalInner } from '/@/components/Modal';
+  import { useMessage } from '/@/hooks/web/useMessage';
+  import { fetchCheckSavepointPath } from "/@/api/flink/app/app";
+  import { trigger } from '/@/api/flink/app/savepoint';
+  const emit = defineEmits(['register', 'updateOption']);
+  const app = reactive<Recordable>({});
+
+  const { t } = useI18n();
+  const { createErrorSwal, Swal } = useMessage();
+  const [registerModal, { closeModal }] = useModalInner((data) => {
+    if (data) {
+      Object.assign(app, data.application);
+      resetFields();
+    }
+  });
+  const [registerForm, { resetFields, validate }] = useForm({
+    name: 'savepointApplicationModal',
+    labelWidth: 120,
+    schemas: [
+      {
+        field: 'customSavepoint',
+        label: 'Custom SavePoint',
+        component: 'Input',
+        componentProps: {
+          placeholder: 'Optional: Entry the custom savepoint path',
+          allowClear: true,
+        },
+        required: false,
+        show: true
+      }
+    ],
+    colon: true,
+    showActionButtonGroup: false,
+    labelCol: { lg: { span: 7, offset: 0 }, sm: { span: 7, offset: 0 } },
+    wrapperCol: { lg: { span: 16, offset: 0 }, sm: { span: 4, offset: 0 } },
+    baseColProps: { span: 24 },
+  });
+
+  /* submit */
+  async function handleSubmit() {
+    try {
+      const { customSavepoint } = (await validate()) as Recordable;
+      const savepointReq = {
+        appId: app.id,
+        savepointPath: customSavepoint,
+      };
+      if (customSavepoint) {
+        const { data } = await fetchCheckSavepointPath({
+          savePoint: customSavepoint,
+        });
+        if (data.data === false) {
+          createErrorSwal('custom savePoint path is invalid, ' + data.message);
+        } else {
+          await handleSavepointAction(savepointReq);
+          emit('updateOption', {
+            type: 'savepointing',
+            key: app.id,
+            value: new Date().getTime(),
+          });
+        }
+      } else {
+        const { data } = await fetchCheckSavepointPath({
+          id: app.id,
+        });
+        if (data.data) {
+          await handleSavepointAction(savepointReq);
+
+        } else {
+          createErrorSwal(data.message);
+        }
+      }
+    } catch (error) {
+      console.error(error);
+    }
+  }
+
+  async function handleSavepointAction(
+    savepointTriggerReq: { appId: string | number,  savepointPath: string | 
null}) {
+    await trigger(savepointTriggerReq);
+    Swal.fire({
+      icon: 'success',
+      title: 'The current savepoint request is sent.',
+      showConfirmButton: false,
+      timer: 2000,
+    });
+    closeModal();
+  }
+</script>
+<template>
+  <BasicModal
+    @register="registerModal"
+    @ok="handleSubmit"
+    :okText="t('common.apply')"
+    :cancelText="t('common.cancelText')"
+  >
+    <template #title>
+      {{ t('flink.app.view.savepoint') }}
+    </template>
+    <BasicForm @register="registerForm" class="!pt-20px" />
+  </BasicModal>
+</template>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 5941bc992..67f3a69ce 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -36,6 +36,7 @@ export const useFlinkApplication = (openStartModal: Fn) => {
     starting: new Map(),
     stopping: new Map(),
     launch: new Map(),
+    savepointing: new Map(),
   };
 
   /* check */
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
index 9a1f406a3..2df2056a4 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
@@ -38,6 +38,7 @@ import { useI18n } from '/@/hooks/web/useI18n';
 export const useAppTableAction = (
   openStartModal: Fn,
   openStopModal: Fn,
+  openSavepointModal: Fn,
   openLogModal: Fn,
   openBuildDrawer: Fn,
   handlePageDataReload: Fn,
@@ -61,13 +62,13 @@ export const useAppTableAction = (
   } = useFlinkApplication(openStartModal);
 
   /* Operation button */
-  function getTableActions(record: AppListRecord, currentPgaeNo: any): 
ActionItem[] {
+  function getTableActions(record: AppListRecord, currentPageNo: any): 
ActionItem[] {
     return [
       {
         tooltip: { title: t('flink.app.operation.edit') },
         auth: 'app:update',
         icon: 'clarity:note-edit-line',
-        onClick: handleEdit.bind(null, record, currentPgaeNo),
+        onClick: handleEdit.bind(null, record, currentPageNo),
       },
       {
         tooltip: { title: t('flink.app.operation.launch') },
@@ -105,6 +106,14 @@ export const useAppTableAction = (
         icon: 'ant-design:pause-circle-outlined',
         onClick: handleCancel.bind(null, record),
       },
+      {
+        tooltip: { title: t('flink.app.operation.savepoint') },
+        ifShow:
+          record.state == AppStateEnum.RUNNING && record['optionState'] == 
OptionStateEnum.NONE,
+        auth: 'savepoint:trigger',
+        icon: 'ant-design:database-outlined',
+        onClick: handleSavepoint.bind(null, record),
+      },
       {
         tooltip: { title: t('flink.app.operation.detail') },
         auth: 'app:detail',
@@ -148,6 +157,12 @@ export const useAppTableAction = (
     flinkAppStore.setApplicationId(app.id);
     router.push({ path: '/flink/app/detail', query: { appId: app.id } });
   }
+  // click savepoint for application
+  function handleSavepoint(app: AppListRecord) {
+    if (!optionApps.savepointing.get(app.id) || app['optionState'] == 
OptionStateEnum.NONE) {
+      openSavepointModal(true, { application: app });
+    }
+  }
   // click stop application
   function handleCancel(app: AppListRecord) {
     if (!optionApps.stopping.get(app.id) || app['optionState'] == 
OptionStateEnum.NONE) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index bb25d1b97..5c6d69da5 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -49,6 +49,8 @@ object FlinkClient extends Logger {
 
   private[this] val SHUTDOWN_REQUEST_CLASS_NAME = 
"org.apache.streampark.flink.client.bean.ShutDownRequest"
 
+  private[this] val SAVEPOINT_REQUEST_CLASS_NAME = 
"org.apache.streampark.flink.client.bean.TriggerSavepointRequest"
+
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     FlinkShimsProxy.proxy(
       submitRequest.flinkVersion,
@@ -62,6 +64,22 @@ object FlinkClient extends Logger {
       })
   }
 
+  def triggerSavepoint(savepointRequest: TriggerSavepointRequest): 
SavepointResponse = {
+    FlinkShimsProxy.proxy(
+      savepointRequest.flinkVersion,
+      (classLoader: ClassLoader) => {
+        val submitClass = 
classLoader.loadClass(FLINK_CLIENT_HANDLER_CLASS_NAME)
+        val requestClass = classLoader.loadClass(SAVEPOINT_REQUEST_CLASS_NAME)
+        val method = submitClass.getDeclaredMethod("triggerSavepoint", 
requestClass)
+        method.setAccessible(true)
+        val obj = method.invoke(null, FlinkShimsProxy.getObject(classLoader, 
savepointRequest))
+        if (obj == null) null
+        else {
+          
FlinkShimsProxy.getObject[SavepointResponse](this.getClass.getClassLoader, obj)
+        }
+      })
+  }
+
   def cancel(stopRequest: CancelRequest): CancelResponse = {
     FlinkShimsProxy.proxy(
       stopRequest.flinkVersion,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index 275a378bb..dd14d2ce6 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -28,8 +28,8 @@ case class CancelRequest(
     executionMode: ExecutionMode,
     clusterId: String,
     jobId: String,
-    withSavePoint: Boolean,
+    override val withSavepoint: Boolean,
     withDrain: Boolean,
-    customSavePointPath: String,
-    kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
-    @Nullable properties: JavaMap[String, Any])
+    savepointPath: String,
+    override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
+    @Nullable properties: JavaMap[String, Any]) extends SavepointRequestTrait 
{}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
similarity index 75%
copy from 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
copy to 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
index 275a378bb..37ccc069f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
@@ -18,18 +18,28 @@
 package org.apache.streampark.flink.client.bean
 
 import java.util.{Map => JavaMap}
-import javax.annotation.Nullable
 
 import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
 import org.apache.streampark.common.enums.ExecutionMode
 
-case class CancelRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    clusterId: String,
-    jobId: String,
-    withSavePoint: Boolean,
-    withDrain: Boolean,
-    customSavePointPath: String,
-    kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
-    @Nullable properties: JavaMap[String, Any])
+import javax.annotation.Nullable
+
+trait SavepointRequestTrait {
+
+  val flinkVersion: FlinkVersion
+
+  val executionMode: ExecutionMode
+
+  val clusterId: String
+
+  val jobId: String
+
+  val withSavepoint: Boolean = true
+
+  val savepointPath: String
+
+  val kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE
+
+  @Nullable val properties: JavaMap[String, Any]
+
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
similarity index 61%
copy from 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
copy to 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
index 275a378bb..ca1b4b29a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
@@ -17,19 +17,5 @@
 
 package org.apache.streampark.flink.client.bean
 
-import java.util.{Map => JavaMap}
-import javax.annotation.Nullable
-
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
-import org.apache.streampark.common.enums.ExecutionMode
-
-case class CancelRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    clusterId: String,
-    jobId: String,
-    withSavePoint: Boolean,
-    withDrain: Boolean,
-    customSavePointPath: String,
-    kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
-    @Nullable properties: JavaMap[String, Any])
+/** Result class of trigger savepoint presents savepoint path. */
+case class SavepointResponse(savePointDir: String)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
similarity index 74%
copy from 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
copy to 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 275a378bb..6805de3f1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -18,18 +18,18 @@
 package org.apache.streampark.flink.client.bean
 
 import java.util.{Map => JavaMap}
-import javax.annotation.Nullable
 
 import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
 import org.apache.streampark.common.enums.ExecutionMode
 
-case class CancelRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    clusterId: String,
-    jobId: String,
-    withSavePoint: Boolean,
-    withDrain: Boolean,
-    customSavePointPath: String,
-    kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
-    @Nullable properties: JavaMap[String, Any])
+import javax.annotation.Nullable
+
+/** Trigger savepoint request. */
+case class TriggerSavepointRequest(flinkVersion: FlinkVersion,
+  executionMode: ExecutionMode,
+  clusterId: String,
+  jobId: String,
+  savepointPath: String,
+  override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
+  @Nullable properties: JavaMap[String, Any]) extends SavepointRequestTrait {
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index 86200d2b3..b16bcc7b9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -36,6 +36,19 @@ object FlinkClientHandler {
     }
   }
 
+  def triggerSavepoint(savepointRequest: TriggerSavepointRequest): 
SavepointResponse = {
+    savepointRequest.executionMode match {
+      case ExecutionMode.LOCAL => 
LocalSubmit.triggerSavepoint(savepointRequest)
+      case ExecutionMode.REMOTE => 
RemoteSubmit.triggerSavepoint(savepointRequest)
+      case ExecutionMode.YARN_APPLICATION => 
YarnApplicationSubmit.triggerSavepoint(savepointRequest)
+      case ExecutionMode.YARN_SESSION => 
YarnSessionSubmit.triggerSavepoint(savepointRequest)
+      case ExecutionMode.YARN_PER_JOB | ExecutionMode.YARN_SESSION => 
YarnPerJobSubmit.triggerSavepoint(savepointRequest)
+      case ExecutionMode.KUBERNETES_NATIVE_SESSION => 
KubernetesNativeSessionSubmit.triggerSavepoint(savepointRequest)
+      case ExecutionMode.KUBERNETES_NATIVE_APPLICATION => 
KubernetesNativeApplicationSubmit.triggerSavepoint(savepointRequest)
+      case _ => throw new UnsupportedOperationException(s"Unsupported 
${savepointRequest.executionMode} Submit ")
+    }
+  }
+
   def cancel(cancelRequest: CancelRequest): CancelResponse = {
     cancelRequest.executionMode match {
       case ExecutionMode.LOCAL => LocalSubmit.cancel(cancelRequest)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationSubmit.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationSubmit.scala
index eaa183454..9fbb46460 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationSubmit.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationSubmit.scala
@@ -88,4 +88,8 @@ object KubernetesNativeApplicationSubmit extends 
KubernetesNativeSubmitTrait {
     super.doCancel(cancelRequest, flinkConfig)
   }
 
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, flinkConf: 
Configuration): SavepointResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+    super.doTriggerSavepoint(request, flinkConf)
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionSubmit.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionSubmit.scala
index c5132b658..6f43adb45 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionSubmit.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionSubmit.scala
@@ -117,7 +117,7 @@ object KubernetesNativeSessionSubmit extends 
KubernetesNativeSubmitTrait with Lo
   }
 
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
-    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
     super.doCancel(cancelRequest, flinkConfig)
   }
 
@@ -212,4 +212,9 @@ object KubernetesNativeSessionSubmit extends 
KubernetesNativeSubmitTrait with Lo
       Utils.close(kubeClient)
     }
   }
+
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, 
flinkConfig: Configuration): SavepointResponse = {
+    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+    super.doTriggerSavepoint(request, flinkConfig)
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalSubmit.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalSubmit.scala
index 2f05460c7..0beb70225 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalSubmit.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalSubmit.scala
@@ -65,6 +65,10 @@ object LocalSubmit extends FlinkSubmitTrait {
     }
   }
 
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, 
flinkConfig: Configuration): SavepointResponse = {
+    RemoteSubmit.doTriggerSavepoint(request, flinkConfig)
+  }
+
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
     RemoteSubmit.doCancel(cancelRequest, flinkConfig)
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteSubmit.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteSubmit.scala
index 7fbb80e82..7c154950a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteSubmit.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteSubmit.scala
@@ -29,7 +29,7 @@ import org.apache.flink.configuration._
 
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.FlinkSubmitTrait
-import org.apache.streampark.flink.client.bean.{CancelRequest, CancelResponse, 
SubmitRequest, SubmitResponse}
+import org.apache.streampark.flink.client.bean.{CancelRequest, CancelResponse, 
TriggerSavepointRequest, SavepointRequestTrait, SavepointResponse, 
SubmitRequest, SubmitResponse}
 import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
 
 /**
@@ -49,28 +49,34 @@ object RemoteSubmit extends FlinkSubmitTrait {
 
   }
 
-  override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
-    flinkConfig
-      .safeSet(DeploymentOptions.TARGET, cancelRequest.executionMode.getName)
-      .safeSet(RestOptions.ADDRESS, 
cancelRequest.properties.get(RestOptions.ADDRESS.key()).toString)
-      .safeSet[JavaInt](RestOptions.PORT, 
cancelRequest.properties.get(RestOptions.PORT.key()).toString.toInt)
-    logInfo(
-      s"""
-         |------------------------------------------------------------------
-         |Effective submit configuration: $flinkConfig
-         |------------------------------------------------------------------
-         |""".stripMargin)
-
-    val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
+  override def doCancel(request: CancelRequest, flinkConfig: Configuration): 
CancelResponse = {
+    executeClientAction(request, flinkConfig, (jobID, clusterClient) => {
+      CancelResponse(super.cancelJob(request, jobID, clusterClient))
+    })
+  }
+
+  private[this] def executeClientAction[O, R <: 
SavepointRequestTrait](request: R,
+                              flinkConfig: Configuration,
+                              actFunc: (JobID, ClusterClient[_]) => O): O = {
     var client: ClusterClient[StandaloneClusterId] = null
+    var standAloneDescriptor: (StandaloneClusterId, 
StandaloneClusterDescriptor) = null
     try {
+      flinkConfig
+        .safeSet(DeploymentOptions.TARGET, request.executionMode.getName)
+        .safeSet(RestOptions.ADDRESS, 
request.properties.get(RestOptions.ADDRESS.key()).toString)
+        .safeSet[JavaInt](RestOptions.PORT, 
request.properties.get(RestOptions.PORT.key()).toString.toInt)
+      logInfo(
+        s"""
+           |------------------------------------------------------------------
+           |Effective submit configuration: $flinkConfig
+           |------------------------------------------------------------------
+           |""".stripMargin)
+      standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
       client = 
standAloneDescriptor._2.retrieve(standAloneDescriptor._1).getClusterClient
-      val jobID = JobID.fromHexString(cancelRequest.jobId)
-      val actionResult = super.cancelJob(cancelRequest, jobID, client)
-      CancelResponse(actionResult)
+      actFunc(JobID.fromHexString(request.jobId), client)
     } catch {
       case e: Exception =>
-        logError(s"stop flink standalone job fail")
+        logError(s"Do ${request.getClass.getSimpleName} for flink standalone 
job fail")
         e.printStackTrace()
         throw e
     } finally {
@@ -79,6 +85,12 @@ object RemoteSubmit extends FlinkSubmitTrait {
     }
   }
 
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, 
flinkConfig: Configuration): SavepointResponse = {
+    executeClientAction(request, flinkConfig, (jobID, clusterClient) => {
+      SavepointResponse(super.triggerSavepoint(request, jobID, clusterClient))
+    })
+  }
+
   /**
    * Submit flink session job via rest api.
    */
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionSubmit.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionSubmit.scala
index d5e951a55..65f90c94a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionSubmit.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionSubmit.scala
@@ -135,9 +135,11 @@ object YarnSessionSubmit extends YarnSubmitTrait {
     }
   }
 
-  override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
+  private[this] def executeClientAction[O, R <: 
SavepointRequestTrait](request: R,
+                                                                       
flinkConfig: Configuration,
+                                                                       
actFunc: (JobID, ClusterClient[_]) => O): O = {
     flinkConfig
-      .safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.clusterId)
+      .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
     logInfo(
       s"""
@@ -152,12 +154,10 @@ object YarnSessionSubmit extends YarnSubmitTrait {
       val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       client = 
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
-      val jobID = JobID.fromHexString(cancelRequest.jobId)
-      val actionResult = super.cancelJob(cancelRequest, jobID, client)
-      CancelResponse(actionResult)
+      actFunc(JobID.fromHexString(request.jobId), client)
     } catch {
       case e: Exception =>
-        logError(s"stop flink yarn session job fail")
+        logError(s"${request.getClass.getSimpleName} for flink yarn session 
job fail")
         e.printStackTrace()
         throw e
     } finally {
@@ -165,6 +165,20 @@ object YarnSessionSubmit extends YarnSubmitTrait {
     }
   }
 
+  override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
+    executeClientAction(cancelRequest, flinkConfig, (jobID, clusterClient) => {
+      val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient)
+      CancelResponse(actionResult)
+    })
+  }
+
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, 
flinkConfig: Configuration): SavepointResponse = {
+    executeClientAction(request, flinkConfig, (jobID, clusterClient) => {
+      val actionResult = super.triggerSavepoint(request, jobID, clusterClient)
+      SavepointResponse(actionResult)
+    })
+  }
+
   def deploy(deployRequest: DeployRequest): DeployResponse = {
     logInfo(
       s"""
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkSubmitTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkSubmitTrait.scala
index a5453ca3b..383abf452 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkSubmitTrait.scala
@@ -142,16 +142,34 @@ trait FlinkSubmitTrait extends Logger {
 
   def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
 
+  @throws[Exception]
+  def triggerSavepoint(savepointRequest: TriggerSavepointRequest): 
SavepointResponse = {
+    logInfo(
+      s"""
+         |----------------------------------------- flink job trigger 
savepoint ---------------------
+         |     userFlinkHome  : ${savepointRequest.flinkVersion.flinkHome}
+         |     flinkVersion   : ${savepointRequest.flinkVersion.version}
+         |     clusterId      : ${savepointRequest.clusterId}
+         |     savePointPath  : ${savepointRequest.savepointPath}
+         |     k8sNamespace   : ${savepointRequest.kubernetesNamespace}
+         |     appId          : ${savepointRequest.clusterId}
+         |     jobId          : ${savepointRequest.jobId}
+         
|-------------------------------------------------------------------------------------------
+         |""".stripMargin)
+    val flinkConf = new Configuration()
+    doTriggerSavepoint(savepointRequest, flinkConf)
+  }
+
   @throws[Exception]
   def cancel(cancelRequest: CancelRequest): CancelResponse = {
     logInfo(
       s"""
-         |----------------------------------------- flink job cancel 
--------------------------------------
+         |----------------------------------------- flink job cancel 
--------------------------------
          |     userFlinkHome  : ${cancelRequest.flinkVersion.flinkHome}
          |     flinkVersion   : ${cancelRequest.flinkVersion.version}
          |     clusterId      : ${cancelRequest.clusterId}
-         |     withSavePoint  : ${cancelRequest.withSavePoint}
-         |     savePointPath  : ${cancelRequest.customSavePointPath}
+         |     withSavePoint  : ${cancelRequest.withSavepoint}
+         |     savePointPath  : ${cancelRequest.savepointPath}
          |     withDrain      : ${cancelRequest.withDrain}
          |     k8sNamespace   : ${cancelRequest.kubernetesNamespace}
          |     appId          : ${cancelRequest.clusterId}
@@ -165,6 +183,9 @@ trait FlinkSubmitTrait extends Logger {
   @throws[Exception]
   def doSubmit(submitRequest: SubmitRequest, flinkConf: Configuration): 
SubmitResponse
 
+  @throws[Exception]
+  def doTriggerSavepoint(request: TriggerSavepointRequest, flinkConf: 
Configuration): SavepointResponse
+
   @throws[Exception]
   def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): 
CancelResponse
 
@@ -452,39 +473,56 @@ trait FlinkSubmitTrait extends Logger {
   }
 
   private[client] def cancelJob(cancelRequest: CancelRequest, jobID: JobID, 
client: ClusterClient[_]): String = {
+
+    val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest)
+
+    val clientTimeout = 
getOptionFromDefaultFlinkConfig(cancelRequest.flinkVersion.flinkHome, 
ClientOptions.CLIENT_TIMEOUT)
+
+    val clientWrapper = new FlinkClusterClient(client)
+
+    (Try(cancelRequest.withSavepoint).getOrElse(false), 
Try(cancelRequest.withDrain).getOrElse(false)) match {
+      case (false, false) =>
+        client.cancel(jobID).get()
+        null
+      case (true, false) => clientWrapper.cancelWithSavepoint(jobID, 
savePointDir).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
+      case (_, _) => clientWrapper.stopWithSavepoint(jobID, 
cancelRequest.withDrain, savePointDir).get(clientTimeout.toMillis, 
TimeUnit.MILLISECONDS)
+    }
+  }
+
+  private def tryGetSavepointPathIfNeed(request: SavepointRequestTrait): 
String = {
     val savePointDir = {
-      if (!cancelRequest.withSavePoint) null;
+      if (!request.withSavepoint) null
       else {
-        if (StringUtils.isNotEmpty(cancelRequest.customSavePointPath)) {
-          cancelRequest.customSavePointPath
+        if (StringUtils.isNotEmpty(request.savepointPath)) {
+          request.savepointPath
         } else {
           val configDir = getOptionFromDefaultFlinkConfig[String](
-            cancelRequest.flinkVersion.flinkHome,
+            request.flinkVersion.flinkHome,
             ConfigOptions.key(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())
               .stringType()
               .defaultValue {
-                if (cancelRequest.executionMode == 
ExecutionMode.YARN_APPLICATION) {
+                if (request.executionMode == ExecutionMode.YARN_APPLICATION) {
                   Workspace.remote.APP_SAVEPOINTS
                 } else null
               })
           if (StringUtils.isEmpty(configDir)) {
-            throw new FlinkException(s"[StreamPark] executionMode: 
${cancelRequest.executionMode.getName}, savePoint path is null or invalid.")
+            throw new FlinkException(s"[StreamPark] executionMode: 
${request.executionMode.getName}, savePoint path is null or invalid.")
           } else configDir
         }
       }
     }
+    savePointDir
+  }
 
-    val clientTimeout = 
getOptionFromDefaultFlinkConfig(cancelRequest.flinkVersion.flinkHome, 
ClientOptions.CLIENT_TIMEOUT)
+  private[client] def triggerSavepoint(savepointRequest: 
TriggerSavepointRequest, jobID: JobID, client: ClusterClient[_]): String = {
+
+    val savepointPath = tryGetSavepointPathIfNeed(savepointRequest)
+
+    val clientTimeout = 
getOptionFromDefaultFlinkConfig(savepointRequest.flinkVersion.flinkHome, 
ClientOptions.CLIENT_TIMEOUT)
 
     val clientWrapper = new FlinkClusterClient(client)
 
-    (Try(cancelRequest.withSavePoint).getOrElse(false), 
Try(cancelRequest.withDrain).getOrElse(false)) match {
-      case (false, false) =>
-        client.cancel(jobID).get()
-        null
-      case (true, false) => clientWrapper.cancelWithSavepoint(jobID, 
savePointDir).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
-      case (_, _) => clientWrapper.stopWithSavepoint(jobID, 
cancelRequest.withDrain, savePointDir).get(clientTimeout.toMillis, 
TimeUnit.MILLISECONDS)
-    }
+    clientWrapper.triggerSavepoint(jobID, 
savepointPath).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeSubmitTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeSubmitTrait.scala
index 325162123..bfdacc575 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeSubmitTrait.scala
@@ -80,14 +80,24 @@ trait KubernetesNativeSubmitTrait extends FlinkSubmitTrait {
   // Tip: Perhaps it would be better to let users freely specify the savepoint 
directory
   @throws[Exception]
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
+    executeClientAction(cancelRequest, flinkConfig, (jobId, clusterClient) => {
+      val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient)
+      IngressController.deleteIngress(cancelRequest.clusterId, 
cancelRequest.kubernetesNamespace)
+      CancelResponse(actionResult)
+    })
+  }
 
+  private[this] def executeClientAction[O, R <: 
SavepointRequestTrait](request: R,
+                                                                flinkConfig: 
Configuration,
+                                                                actFunc: 
(JobID, ClusterClient[_]) => O): O = {
+    val hints = s"[flink-submit] execute ${request.getClass.getSimpleName} for 
flink job failed,"
     require(
-      StringUtils.isNotBlank(cancelRequest.clusterId),
-      s"[flink-submit] stop flink job failed, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}")
+      StringUtils.isNotBlank(request.clusterId),
+      s"${hints}, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}")
 
     flinkConfig
-      .safeSet(KubernetesConfigOptions.CLUSTER_ID, cancelRequest.clusterId)
-      .safeSet(KubernetesConfigOptions.NAMESPACE, 
cancelRequest.kubernetesNamespace)
+      .safeSet(KubernetesConfigOptions.CLUSTER_ID, request.clusterId)
+      .safeSet(KubernetesConfigOptions.NAMESPACE, request.kubernetesNamespace)
 
     var clusterDescriptor: KubernetesClusterDescriptor = null
     var client: ClusterClient[String] = null
@@ -95,13 +105,10 @@ trait KubernetesNativeSubmitTrait extends FlinkSubmitTrait 
{
     try {
       clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
       client = 
clusterDescriptor.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)).getClusterClient
-      val jobID = JobID.fromHexString(cancelRequest.jobId)
-      val actionResult = super.cancelJob(cancelRequest, jobID, client)
-      IngressController.deleteIngress(cancelRequest.clusterId, 
cancelRequest.kubernetesNamespace)
-      CancelResponse(actionResult)
+      actFunc(JobID.fromHexString(request.jobId), client)
     } catch {
       case e: Exception =>
-        logger.error(s"[flink-submit] stop flink job failed, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}, 
cancelRequest=${cancelRequest}")
+        logger.error(s"${hints} 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}, request=${request}")
         throw e
     } finally {
       if (client != null) client.close()
@@ -109,6 +116,16 @@ trait KubernetesNativeSubmitTrait extends FlinkSubmitTrait 
{
     }
   }
 
+  @throws[Exception]
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, 
flinkConfig: Configuration): SavepointResponse = {
+
+    executeClientAction(request, flinkConfig, (jobId, clusterClient) => {
+      val actionResult = super.triggerSavepoint(request.asInstanceOf, jobId, 
clusterClient)
+      IngressController.deleteIngress(request.clusterId, 
request.kubernetesNamespace)
+      SavepointResponse(actionResult)
+    })
+  }
+
   // noinspection DuplicatedCode
   /*
     tips:
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnSubmitTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnSubmitTrait.scala
index 1aaa5f491..45b8259fd 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnSubmitTrait.scala
@@ -17,11 +17,13 @@
 
 package org.apache.streampark.flink.client.`trait`
 
+import org.apache.flink.api.common.JobID
+
 import java.lang.{Boolean => JavaBool}
 import java.lang.reflect.Method
 import scala.util.Try
 import org.apache.flink.client.deployment.{ClusterDescriptor, 
ClusterSpecification, DefaultClusterClientServiceLoader}
-import org.apache.flink.client.program.ClusterClientProvider
+import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.util.FlinkException
@@ -36,10 +38,11 @@ import org.apache.streampark.flink.client.bean._
  */
 trait YarnSubmitTrait extends FlinkSubmitTrait {
 
-  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
-    val jobID = getJobID(cancelRequest.jobId)
+  private[this] def executeClientAction[R <: SavepointRequestTrait, 
O](request: R, flinkConf: Configuration,
+                                                         actionFunc: (JobID, 
ClusterClient[_]) => O): O = {
+    val jobID = getJobID(request.jobId)
     val clusterClient = {
-      flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, 
cancelRequest.clusterId)
+      flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
       val clusterClientFactory = new YarnClusterClientFactory
       val applicationId = clusterClientFactory.getClusterId(flinkConf)
       if (applicationId == null) {
@@ -50,14 +53,26 @@ trait YarnSubmitTrait extends FlinkSubmitTrait {
       clusterDescriptor.retrieve(applicationId).getClusterClient
     }
     Try {
-      val savepointDir = super.cancelJob(cancelRequest, jobID, clusterClient)
-      CancelResponse(savepointDir)
+      actionFunc(jobID, clusterClient)
     }.recover {
       case e => throw new FlinkException(
-          s"[StreamPark] Triggering a savepoint for the job 
${cancelRequest.jobId} failed. detail: ${Utils.stringifyException(e)}");
+        s"[StreamPark] Do ${request.getClass.getSimpleName} for the job 
${request.jobId} failed. " +
+          s"detail: ${Utils.stringifyException(e)}");
     }.get
   }
 
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, flinkConf: 
Configuration): SavepointResponse = {
+    executeClientAction(request, flinkConf, (jid, client) => {
+            SavepointResponse(super.triggerSavepoint(request, jid, client))
+    })
+  }
+
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    executeClientAction(cancelRequest, flinkConf, (jid, client) => {
+      CancelResponse(super.cancelJob(cancelRequest, jid, client))
+    })
+  }
+
   private lazy val deployInternalMethod: Method = {
     val paramClass = Array(
       classOf[ClusterSpecification],
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
index 23e89e6f3..dd5cc5315 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkClientTrait.scala
@@ -24,6 +24,10 @@ import org.apache.flink.client.program.ClusterClient
 
 abstract class FlinkClientTrait[T](clusterClient: ClusterClient[T]) {
 
+  def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(jobID, savepointDir)
+  }
+
   def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] 
= {
     clusterClient.cancelWithSavepoint(jobID, s)
   }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 2ca1e9c90..d7828e461 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -24,6 +24,10 @@ import org.apache.flink.core.execution.SavepointFormatType
 
 class FlinkClusterClient[T](clusterClient: ClusterClient[T]) extends 
FlinkClientTrait[T](clusterClient) {
 
+  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  }
+
   override def cancelWithSavepoint(jobID: JobID, savepointDirectory: String): 
CompletableFuture[String] = {
     clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
   }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
index 2ca1e9c90..d7828e461 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -24,6 +24,10 @@ import org.apache.flink.core.execution.SavepointFormatType
 
 class FlinkClusterClient[T](clusterClient: ClusterClient[T]) extends 
FlinkClientTrait[T](clusterClient) {
 
+  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  }
+
   override def cancelWithSavepoint(jobID: JobID, savepointDirectory: String): 
CompletableFuture[String] = {
     clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
   }

Reply via email to