This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 884728e6e [Improve] savepoint minor improvement
884728e6e is described below
commit 884728e6efc137e5d8f003b0cf73e7cae023c92c
Author: benjobs <[email protected]>
AuthorDate: Wed Sep 4 19:16:56 2024 +0800
[Improve] savepoint minor improvement
---
.../console/core/mapper/SavepointMapper.java | 2 +
.../console/core/service/SavepointService.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 20 ++++++--
.../core/service/impl/SavepointServiceImpl.java | 54 +++++++++++++---------
.../main/resources/mapper/core/SavepointMapper.xml | 6 +++
.../flink/client/trait/FlinkClientTrait.scala | 36 ++-------------
6 files changed, 58 insertions(+), 62 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
index e79b54ae1..690463c43 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
@@ -25,4 +25,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface SavepointMapper extends BaseMapper<Savepoint> {
Savepoint findLatestByTime(@Param("appId") Long appId);
+
+ void cleanLatest(@Param("appId") Long appId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index 2c31d77a3..f40e8e7a6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -33,7 +33,7 @@ public interface SavepointService extends IService<Savepoint>
{
Savepoint getLatest(Long id);
- void trigger(Long appId, @Nullable String savepointPath);
+ void trigger(Long appId, @Nullable String savepointPath) throws Exception;
Boolean delete(Long id, Application application) throws InternalException;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ed38dabd7..0d2f3decd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1309,6 +1309,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setAppId(application.getId());
applicationLog.setJobManagerUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
+
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
applicationLog.setYarnAppId(application.getClusterId());
}
@@ -1336,13 +1337,22 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String customSavepoint = null;
if (appParam.getRestoreOrTriggerSavepoint()) {
customSavepoint = appParam.getSavepointPath();
- if (customSavepoint == null) {
- customSavepoint =
- savepointService.processPath(
- customSavepoint, application.getJobName(),
application.getId());
- } else {
+ if (StringUtils.isBlank(customSavepoint)) {
customSavepoint = savepointService.getSavePointPath(appParam);
}
+ if (StringUtils.isBlank(customSavepoint)
+ || application.getExecutionModeEnum() ==
ExecutionMode.YARN_APPLICATION) {
+ customSavepoint = Workspace.remote().APP_SAVEPOINTS();
+ }
+ if (StringUtils.isNotBlank(customSavepoint)) {
+ savepointService.processPath(
+ customSavepoint, application.getJobName(), application.getId());
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "[StreamPark] executionMode: %s, savePoint path is null or
invalid.",
+ application.getExecutionModeEnum().getName()));
+ }
}
Map<String, Object> properties = new HashMap<>();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 61b025b01..977afbb51 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -17,13 +17,13 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.RestRequest;
-import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
@@ -111,6 +111,7 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));
+ @Autowired private SavepointMapper savepointMapper;
@Override
public void expire(Long appId) {
@@ -121,13 +122,6 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
this.update(savepoint, queryWrapper);
}
- @Override
- public boolean save(Savepoint entity) {
- this.expire(entity);
- this.expire(entity.getAppId());
- return super.save(entity);
- }
-
private void expire(Savepoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application = applicationService.getById(entity.getAppId());
@@ -321,7 +315,19 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
}
@Override
- public void trigger(Long appId, @Nullable String savepointPath) {
+ public boolean save(Savepoint savepoint) {
+ this.expire(savepoint);
+ this.expire(savepoint.getAppId());
+ this.cleanLatest(savepoint.getAppId());
+ return super.save(savepoint);
+ }
+
+ private void cleanLatest(Long appId) {
+ savepointMapper.cleanLatest(appId);
+ }
+
+ @Override
+ public void trigger(Long appId, @Nullable String savepointPath) throws
Exception {
log.info("Start to trigger savepoint for app {}", appId);
Application application = applicationService.getById(appId);
@@ -407,21 +413,23 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
});
}
- private String getFinalSavepointDir(@Nullable String savepointPath,
Application application) {
+ private String getFinalSavepointDir(@Nullable String savepointPath,
Application application)
+ throws Exception {
String result = savepointPath;
- if (StringUtils.isEmpty(savepointPath)) {
- try {
- result = this.getSavePointPath(application);
- } catch (Exception e) {
- log.error(
- "Error in getting savepoint path for triggering savepoint for
app:{}",
- application.getId(),
- e);
- throw new ApiAlertException(
- "Error in getting savepoint path for triggering savepoint for app "
- + application.getId(),
- e);
- }
+ if (StringUtils.isBlank(savepointPath)) {
+ result = this.getSavePointPath(application);
+ }
+ if (StringUtils.isBlank(result)
+ || application.getExecutionModeEnum() ==
ExecutionMode.YARN_APPLICATION) {
+ result = Workspace.remote().APP_SAVEPOINTS();
+ }
+ if (StringUtils.isNotBlank(result)) {
+ processPath(result, application.getJobName(), application.getId());
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "[StreamPark] executionMode: %s, savePoint path is null or
invalid.",
+ application.getExecutionModeEnum().getName()));
}
return result;
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
index c7829428f..af4ccbe8f 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
@@ -34,4 +34,10 @@
limit 1;
</select>
+ <update id="cleanLatest">
+ update t_flink_savepoint
+ set latest = 0
+ where app_id = #{appId}
+ </update>
+
</mapper>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index c9bc9b59d..df61ee9d0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -475,8 +475,6 @@ trait FlinkClientTrait extends Logger {
jobID: JobID,
client: ClusterClient[_]): String = {
- val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest)
-
val clientWrapper = new FlinkClusterClient(client)
val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
@@ -487,49 +485,21 @@ trait FlinkClientTrait extends Logger {
null
case (true, false) =>
clientWrapper
- .cancelWithSavepoint(jobID, savePointDir)
+ .cancelWithSavepoint(jobID, cancelRequest.savepointPath)
.get()
case (_, _) =>
clientWrapper
- .stopWithSavepoint(jobID, cancelRequest.withDrain, savePointDir)
+ .stopWithSavepoint(jobID, cancelRequest.withDrain,
cancelRequest.savepointPath)
.get()
}
}
- private def tryGetSavepointPathIfNeed(request: SavepointRequestTrait):
String = {
- if (!request.withSavepoint) null
- else {
- if (StringUtils.isNotEmpty(request.savepointPath)) {
- request.savepointPath
- } else {
- val configDir = getOptionFromDefaultFlinkConfig[String](
- request.flinkVersion.flinkHome,
- ConfigOptions
- .key(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())
- .stringType()
- .defaultValue {
- if (request.executionMode == ExecutionMode.YARN_APPLICATION) {
- Workspace.remote.APP_SAVEPOINTS
- } else null
- }
- )
-
- if (StringUtils.isEmpty(configDir)) {
- throw new FlinkException(
- s"[StreamPark] executionMode: ${request.executionMode.getName},
savePoint path is null or invalid.")
- } else configDir
-
- }
- }
- }
-
private[client] def triggerSavepoint(
savepointRequest: TriggerSavepointRequest,
jobID: JobID,
client: ClusterClient[_]): String = {
- val savepointPath = tryGetSavepointPathIfNeed(savepointRequest)
val clientWrapper = new FlinkClusterClient(client)
- clientWrapper.triggerSavepoint(jobID, savepointPath).get()
+ clientWrapper.triggerSavepoint(jobID, savepointRequest.savepointPath).get()
}
def closeSubmit(submitRequest: SubmitRequest, close: AutoCloseable*): Unit =
{