This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new b828b5059 [Bug] save checkpoint bug fixed. (#4002)
b828b5059 is described below
commit b828b5059807bb36b18885ea2e839c55a2a98a2b
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 29 00:28:09 2024 +0800
[Bug] save checkpoint bug fixed. (#4002)
* [Bug] checkpoint chk-id bug fixed.
---
.../core/service/impl/ApplicationServiceImpl.java | 6 ---
.../console/core/task/CheckpointProcessor.java | 60 +++++++++++-----------
2 files changed, 29 insertions(+), 37 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 1e1d5ae8f..8686a68d1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1702,12 +1702,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// 3) success
applicationLog.setSuccess(true);
- // issue: https://github.com/apache/incubator-streampark/issues/3749
- if (appParam.getRestoreOrTriggerSavepoint() == null
- || !appParam.getRestoreOrTriggerSavepoint()) {
- checkpointProcessor.resetCheckpointNum(appParam.getId());
- }
-
if (response.flinkConfig() != null) {
String jmMemory =
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 2f6f5bb90..22451eb5c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -83,19 +83,29 @@ public class CheckpointProcessor {
CheckPointKey checkPointKey = new CheckPointKey(appId, jobID,
checkPoint.getId());
if (CheckPointStatus.COMPLETED.equals(status)) {
- if (checkSaveAsSavepoint(checkPointKey, checkPoint)) {
- savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
- saveSavepoint(checkPoint, application.getId());
- flinkAppHttpWatcher.cleanSavepoint(application);
- return;
+ switch (checkPoint.getCheckPointType()) {
+ case SAVEPOINT:
+ if (checkSaveForSavepoint(checkPointKey, checkPoint)) {
+ savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
+ saveSavepoint(checkPoint, application.getId());
+ flinkAppHttpWatcher.cleanSavepoint(application);
+ return;
+ }
+ break;
+ case CHECKPOINT:
+ Long latestChkId = getLatestCheckpointId(appId,
checkPointKey.getCheckPointId());
+ if (checkSaveForCheckpoint(checkPoint, latestChkId)) {
+ checkPointCache.put(checkPointKey.getCheckPointId(),
checkPoint.getId());
+ saveSavepoint(checkPoint, application.getId());
+ }
+ break;
+ default:
+ break;
}
+ return;
+ }
- Long latestChkId = getLatestCheckpointId(appId,
checkPointKey.getCheckPointId());
- if (checkSaveAsCheckpoint(checkPoint, latestChkId)) {
- checkPointCache.put(checkPointKey.getCheckPointId(),
checkPoint.getId());
- saveSavepoint(checkPoint, application.getId());
- }
- } else if (shouldProcessFailedTrigger(checkPoint,
application.cpFailedTrigger(), status)) {
+ if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(),
status)) {
Counter counter = checkPointFailedCache.get(appId);
if (counter == null) {
checkPointFailedCache.put(appId, new
Counter(checkPoint.getTriggerTimestamp()));
@@ -131,27 +141,15 @@ public class CheckpointProcessor {
}
}
- // issue: https://github.com/apache/incubator-streampark/issues/3749
- public void resetCheckpointNum(Long appId) {
- checkPointCache
- .asMap()
- .forEach(
- (k, v) -> {
- if (k.startsWith(appId.toString())) {
- checkPointCache.invalidate(k);
- }
- });
- }
-
- private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint
checkPoint, Long latestId) {
- return !checkPoint.getIsSavepoint() && (latestId == null || latestId <
checkPoint.getId());
+ private boolean checkSaveForCheckpoint(
+ @Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) {
+ return checkPoint.getId().equals(1L)
+ || latestId == null
+ || !latestId.equals(checkPoint.getId());
}
- private boolean checkSaveAsSavepoint(
+ private boolean checkSaveForSavepoint(
CheckPointKey checkPointKey, @Nonnull CheckPoints.CheckPoint checkPoint)
{
- if (!checkPoint.getIsSavepoint()) {
- return false;
- }
return savepointedCache.getIfPresent(checkPointKey.getSavePointId()) ==
null
// If the savepoint triggered before SAVEPOINT_CACHE_HOUR span, we'll
see it as out-of-time
// savepoint and ignore it.
@@ -160,9 +158,9 @@ public class CheckpointProcessor {
}
@Nullable
- private Long getLatestCheckpointId(Long appId, String cacheId) {
+ private Long getLatestCheckpointId(Long appId, String checkpointId) {
return checkPointCache.get(
- cacheId,
+ checkpointId,
key -> {
Savepoint savepoint = savepointService.getLatest(appId);
return
Optional.ofNullable(savepoint).map(Savepoint::getChkId).orElse(null);