This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 762006143 [Improve] check checkpoint minor improvement
762006143 is described below
commit 7620061437a2a1cbea15f032ea3e5e665f15a7a5
Author: benjobs <[email protected]>
AuthorDate: Tue Feb 13 16:37:17 2024 +0800
[Improve] check checkpoint minor improvement
---
.../apache/streampark/console/core/task/CheckpointProcessor.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 092f6b6ae..8fc19a78b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -83,7 +83,7 @@ public class CheckpointProcessor {
CheckPointKey checkPointKey = new CheckPointKey(appId, jobID,
checkPoint.getId());
if (CheckPointStatus.COMPLETED.equals(status)) {
- if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
+ if (checkSaveAsSavepoint(checkPointKey, checkPoint)) {
savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
saveSavepoint(checkPoint, application.getId());
flinkAppHttpWatcher.cleanSavepoint(application);
@@ -91,7 +91,7 @@ public class CheckpointProcessor {
}
Long latestChkId = getLatestCheckpointedId(appId,
checkPointKey.getCheckPointId());
- if (shouldStoreAsCheckpoint(checkPoint, latestChkId)) {
+ if (checkSaveAsCheckpoint(checkPoint, latestChkId)) {
checkPointCache.put(checkPointKey.getCheckPointId(),
checkPoint.getId());
saveSavepoint(checkPoint, application.getId());
}
@@ -131,12 +131,11 @@ public class CheckpointProcessor {
}
}
- private static boolean shouldStoreAsCheckpoint(
- @Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) {
+ private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint
checkPoint, Long latestId) {
return !checkPoint.getIsSavepoint() && (latestId == null || latestId <
checkPoint.getId());
}
- private boolean shouldStoreAsSavepoint(
+ private boolean checkSaveAsSavepoint(
CheckPointKey checkPointKey, @Nonnull CheckPoints.CheckPoint checkPoint)
{
if (!checkPoint.getIsSavepoint()) {
return false;