This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new a4f64c248 [Improve] trigger savepoint bug fixed.
a4f64c248 is described below
commit a4f64c248158956dc48009d64305425cea79770e
Author: benjobs <[email protected]>
AuthorDate: Wed Mar 6 23:51:21 2024 +0800
[Improve] trigger savepoint bug fixed.
---
.../console/core/service/impl/SavePointServiceImpl.java | 1 -
.../streampark/console/core/task/FlinkAppHttpWatcher.java | 14 +++++++++++++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 67e828a13..687026e4a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -334,7 +334,6 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
savepointResponse -> {
if (savepointResponse != null &&
savepointResponse.savePointDir() != null) {
applicationLog.setSuccess(true);
- flinkAppHttpWatcher.cleanSavepoint(application);
String savePointDir = savepointResponse.savePointDir();
log.info("Request savepoint successful, savepointDir: {}",
savePointDir);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index a5ee4c158..08a7f7902 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -449,6 +449,7 @@ public class FlinkAppHttpWatcher {
} else {
WATCHING_APPS.put(appId, application);
}
+
StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
StateChangeEvent nowEvent = StateChangeEvent.of(application);
if (!nowEvent.equals(event)) {
@@ -617,8 +618,12 @@ public class FlinkAppHttpWatcher {
}
public void cleanSavepoint(Application application) {
- SAVEPOINT_CACHE.invalidate(application.getId());
application.setOptionState(OptionState.NONE.getValue());
+ StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
+ if (event != null && event.getOptionState() == OptionState.SAVEPOINTING) {
+ doPersistMetrics(application, false);
+ }
+ SAVEPOINT_CACHE.invalidate(application.getId());
}
/** set current option state */
@@ -648,6 +653,13 @@ public class FlinkAppHttpWatcher {
}
log.info("[StreamPark][FlinkAppHttpWatcher] add app to
savepoint,appId:{}", appId);
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
+
+ // update to PREVIOUS_STATUS
+ StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
+ if (event != null) {
+ event.setOptionState(OptionState.SAVEPOINTING);
+ PREVIOUS_STATUS.put(appId, event);
+ }
}
public static void unWatching(Long appId) {