This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 171f7eefd [Improve] checkpoint save bug fixed. (#4006)
171f7eefd is described below
commit 171f7eefdde2b2137e270a8893a938c82bdc943a
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 29 17:17:45 2024 +0800
[Improve] checkpoint save bug fixed. (#4006)
---
.../apache/streampark/console/core/mapper/SavepointMapper.java | 6 +++++-
.../console/core/service/impl/SavepointServiceImpl.java | 6 +++++-
.../apache/streampark/console/core/task/CheckpointProcessor.java | 4 +---
.../src/main/resources/mapper/core/SavepointMapper.xml | 8 ++++++++
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
index 8332b0ae7..e79b54ae1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
@@ -19,6 +19,10 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.Savepoint;
+import org.apache.ibatis.annotations.Param;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface SavepointMapper extends BaseMapper<Savepoint> {}
+public interface SavepointMapper extends BaseMapper<Savepoint> {
+ Savepoint findLatestByTime(@Param("appId") Long appId);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index cf3c630e1..ead9934a8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -232,7 +232,11 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
new LambdaQueryWrapper<Savepoint>()
.eq(Savepoint::getAppId, id)
.eq(Savepoint::getLatest, true);
- return this.getOne(queryWrapper);
+ Savepoint savepoint = this.baseMapper.selectOne(queryWrapper);
+ if (savepoint == null) {
+ savepoint = this.baseMapper.findLatestByTime(id);
+ }
+ return savepoint;
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 22451eb5c..8a4c5b09c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -143,9 +143,7 @@ public class CheckpointProcessor {
private boolean checkSaveForCheckpoint(
@Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) {
- return checkPoint.getId().equals(1L)
- || latestId == null
- || !latestId.equals(checkPoint.getId());
+ return latestId == null || !latestId.equals(checkPoint.getId());
}
private boolean checkSaveForSavepoint(
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
index 99ff6b427..c7829428f 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
@@ -26,4 +26,12 @@
<result column="trigger_time" jdbcType="DATE" property="triggerTime"/>
<result column="create_time" jdbcType="DATE" property="createTime"/>
</resultMap>
+
+ <select id="findLatestByTime"
resultType="org.apache.streampark.console.core.entity.Savepoint">
+ select * from t_flink_savepoint
+ where app_id = #{appId}
+ order by trigger_time desc
+ limit 1;
+ </select>
+
</mapper>