This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 884728e6e [Improve] savepoint minor improvement
884728e6e is described below

commit 884728e6efc137e5d8f003b0cf73e7cae023c92c
Author: benjobs <[email protected]>
AuthorDate: Wed Sep 4 19:16:56 2024 +0800

    [Improve] savepoint minor improvement
---
 .../console/core/mapper/SavepointMapper.java       |  2 +
 .../console/core/service/SavepointService.java     |  2 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 20 ++++++--
 .../core/service/impl/SavepointServiceImpl.java    | 54 +++++++++++++---------
 .../main/resources/mapper/core/SavepointMapper.xml |  6 +++
 .../flink/client/trait/FlinkClientTrait.scala      | 36 ++-------------
 6 files changed, 58 insertions(+), 62 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
index e79b54ae1..690463c43 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
@@ -25,4 +25,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
 public interface SavepointMapper extends BaseMapper<Savepoint> {
   Savepoint findLatestByTime(@Param("appId") Long appId);
+
+  void cleanLatest(@Param("appId") Long appId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index 2c31d77a3..f40e8e7a6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -33,7 +33,7 @@ public interface SavepointService extends IService<Savepoint> 
{
 
   Savepoint getLatest(Long id);
 
-  void trigger(Long appId, @Nullable String savepointPath);
+  void trigger(Long appId, @Nullable String savepointPath) throws Exception;
 
   Boolean delete(Long id, Application application) throws InternalException;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ed38dabd7..0d2f3decd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1309,6 +1309,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     applicationLog.setAppId(application.getId());
     applicationLog.setJobManagerUrl(application.getJobManagerUrl());
     applicationLog.setOptionTime(new Date());
+
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
       applicationLog.setYarnAppId(application.getClusterId());
     }
@@ -1336,13 +1337,22 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     String customSavepoint = null;
     if (appParam.getRestoreOrTriggerSavepoint()) {
       customSavepoint = appParam.getSavepointPath();
-      if (customSavepoint == null) {
-        customSavepoint =
-            savepointService.processPath(
-                customSavepoint, application.getJobName(), 
application.getId());
-      } else {
+      if (StringUtils.isBlank(customSavepoint)) {
         customSavepoint = savepointService.getSavePointPath(appParam);
       }
+      if (StringUtils.isBlank(customSavepoint)
+          || application.getExecutionModeEnum() == 
ExecutionMode.YARN_APPLICATION) {
+        customSavepoint = Workspace.remote().APP_SAVEPOINTS();
+      }
+      if (StringUtils.isNotBlank(customSavepoint)) {
+        savepointService.processPath(
+            customSavepoint, application.getJobName(), application.getId());
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "[StreamPark] executionMode: %s, savePoint path is null or 
invalid.",
+                application.getExecutionModeEnum().getName()));
+      }
     }
 
     Map<String, Object> properties = new HashMap<>();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 61b025b01..977afbb51 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -17,13 +17,13 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
-import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
@@ -111,6 +111,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
           TimeUnit.SECONDS,
           new LinkedBlockingQueue<>(),
           ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));
+  @Autowired private SavepointMapper savepointMapper;
 
   @Override
   public void expire(Long appId) {
@@ -121,13 +122,6 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     this.update(savepoint, queryWrapper);
   }
 
-  @Override
-  public boolean save(Savepoint entity) {
-    this.expire(entity);
-    this.expire(entity.getAppId());
-    return super.save(entity);
-  }
-
   private void expire(Savepoint entity) {
     FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
     Application application = applicationService.getById(entity.getAppId());
@@ -321,7 +315,19 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
   }
 
   @Override
-  public void trigger(Long appId, @Nullable String savepointPath) {
+  public boolean save(Savepoint savepoint) {
+    this.expire(savepoint);
+    this.expire(savepoint.getAppId());
+    this.cleanLatest(savepoint.getAppId());
+    return super.save(savepoint);
+  }
+
+  private void cleanLatest(Long appId) {
+    savepointMapper.cleanLatest(appId);
+  }
+
+  @Override
+  public void trigger(Long appId, @Nullable String savepointPath) throws 
Exception {
     log.info("Start to trigger savepoint for app {}", appId);
     Application application = applicationService.getById(appId);
 
@@ -407,21 +413,23 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
             });
   }
 
-  private String getFinalSavepointDir(@Nullable String savepointPath, 
Application application) {
+  private String getFinalSavepointDir(@Nullable String savepointPath, 
Application application)
+      throws Exception {
     String result = savepointPath;
-    if (StringUtils.isEmpty(savepointPath)) {
-      try {
-        result = this.getSavePointPath(application);
-      } catch (Exception e) {
-        log.error(
-            "Error in getting savepoint path for triggering savepoint for 
app:{}",
-            application.getId(),
-            e);
-        throw new ApiAlertException(
-            "Error in getting savepoint path for triggering savepoint for app "
-                + application.getId(),
-            e);
-      }
+    if (StringUtils.isBlank(savepointPath)) {
+      result = this.getSavePointPath(application);
+    }
+    if (StringUtils.isBlank(result)
+        || application.getExecutionModeEnum() == 
ExecutionMode.YARN_APPLICATION) {
+      result = Workspace.remote().APP_SAVEPOINTS();
+    }
+    if (StringUtils.isNotBlank(result)) {
+      processPath(result, application.getJobName(), application.getId());
+    } else {
+      throw new IllegalArgumentException(
+          String.format(
+              "[StreamPark] executionMode: %s, savePoint path is null or 
invalid.",
+              application.getExecutionModeEnum().getName()));
     }
     return result;
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
index c7829428f..af4ccbe8f 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
@@ -34,4 +34,10 @@
         limit 1;
     </select>
 
+    <update id="cleanLatest">
+        update t_flink_savepoint
+        set latest = 0
+        where app_id = #{appId}
+    </update>
+
 </mapper>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index c9bc9b59d..df61ee9d0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -475,8 +475,6 @@ trait FlinkClientTrait extends Logger {
       jobID: JobID,
       client: ClusterClient[_]): String = {
 
-    val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest)
-
     val clientWrapper = new FlinkClusterClient(client)
     val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
     val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
@@ -487,49 +485,21 @@ trait FlinkClientTrait extends Logger {
         null
       case (true, false) =>
         clientWrapper
-          .cancelWithSavepoint(jobID, savePointDir)
+          .cancelWithSavepoint(jobID, cancelRequest.savepointPath)
           .get()
       case (_, _) =>
         clientWrapper
-          .stopWithSavepoint(jobID, cancelRequest.withDrain, savePointDir)
+          .stopWithSavepoint(jobID, cancelRequest.withDrain, 
cancelRequest.savepointPath)
           .get()
     }
   }
 
-  private def tryGetSavepointPathIfNeed(request: SavepointRequestTrait): 
String = {
-    if (!request.withSavepoint) null
-    else {
-      if (StringUtils.isNotEmpty(request.savepointPath)) {
-        request.savepointPath
-      } else {
-        val configDir = getOptionFromDefaultFlinkConfig[String](
-          request.flinkVersion.flinkHome,
-          ConfigOptions
-            .key(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())
-            .stringType()
-            .defaultValue {
-              if (request.executionMode == ExecutionMode.YARN_APPLICATION) {
-                Workspace.remote.APP_SAVEPOINTS
-              } else null
-            }
-        )
-
-        if (StringUtils.isEmpty(configDir)) {
-          throw new FlinkException(
-            s"[StreamPark] executionMode: ${request.executionMode.getName}, 
savePoint path is null or invalid.")
-        } else configDir
-
-      }
-    }
-  }
-
   private[client] def triggerSavepoint(
       savepointRequest: TriggerSavepointRequest,
       jobID: JobID,
       client: ClusterClient[_]): String = {
-    val savepointPath = tryGetSavepointPathIfNeed(savepointRequest)
     val clientWrapper = new FlinkClusterClient(client)
-    clientWrapper.triggerSavepoint(jobID, savepointPath).get()
+    clientWrapper.triggerSavepoint(jobID, savepointRequest.savepointPath).get()
   }
 
   def closeSubmit(submitRequest: SubmitRequest, close: AutoCloseable*): Unit = 
{

Reply via email to