This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 4b9544e7a [Improve] flink job restore state from checkpoint bug fixed.
#3749 (#3767)
4b9544e7a is described below
commit 4b9544e7a7ef60f528fa27c1240db50c3343f4e3
Author: benjobs <[email protected]>
AuthorDate: Tue Jun 18 18:46:07 2024 +0800
[Improve] flink job restore state from checkpoint bug fixed. #3749 (#3767)
Co-authored-by: benjobs <[email protected]>
---
.../console/core/service/impl/ApplicationServiceImpl.java | 5 +++++
.../streampark/console/core/task/CheckpointProcessor.java | 12 ++++++++++++
2 files changed, 17 insertions(+)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9fedb6083..33affd619 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1695,6 +1695,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// 3) success
applicationLog.setSuccess(true);
+ // issue: https://github.com/apache/incubator-streampark/issues/3749
+ if (appParam.getSavePointed() == null || !appParam.getSavePointed())
{
+ checkpointProcessor.resetCheckpointNum(appParam.getId());
+ }
+
if (response.flinkConfig() != null) {
String jmMemory =
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 96f4c2cfa..e08d37102 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -131,6 +131,18 @@ public class CheckpointProcessor {
}
}
+ // issue: https://github.com/apache/incubator-streampark/issues/3749
+ public void resetCheckpointNum(Long appId) {
+ checkPointCache
+ .asMap()
+ .forEach(
+ (k, v) -> {
+ if (k.startsWith(appId.toString())) {
+ checkPointCache.invalidate(k);
+ }
+ });
+ }
+
private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint
checkPoint, Long latestId) {
return !checkPoint.getIsSavepoint() && (latestId == null || latestId <
checkPoint.getId());
}