This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 03fffdaf2 [Improve] trigger savepoint improvements
03fffdaf2 is described below
commit 03fffdaf2be754c0a4f56070e394094a981c67df
Author: benjobs <[email protected]>
AuthorDate: Wed Mar 6 18:19:24 2024 +0800
[Improve] trigger savepoint improvements
---
.../console/core/entity/Application.java | 1 +
.../core/service/impl/SavePointServiceImpl.java | 1 +
.../console/core/task/FlinkAppHttpWatcher.java | 24 +++++++++++++---------
.../src/views/flink/app/hooks/useSavepoint.tsx | 9 +++++---
4 files changed, 22 insertions(+), 13 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index cda37c306..cb08078ef 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -120,6 +120,7 @@ public class Application implements Serializable {
/** has restart count */
private Integer restartCount;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer optionState;
/** alert id */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 687026e4a..67e828a13 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -334,6 +334,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
savepointResponse -> {
if (savepointResponse != null &&
savepointResponse.savePointDir() != null) {
applicationLog.setSuccess(true);
+ flinkAppHttpWatcher.cleanSavepoint(application);
String savePointDir = savepointResponse.savePointDir();
log.info("Request savepoint successful, savepointDir: {}",
savePointDir);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 226c3a561..a5ee4c158 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -403,15 +403,16 @@ public class FlinkAppHttpWatcher {
NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
*/
+ Long appId = application.getId();
if (OptionState.STARTING.equals(optionState)) {
- Application latestApp = WATCHING_APPS.get(application.getId());
+ Application latestApp = WATCHING_APPS.get(appId);
ReleaseState releaseState = latestApp.getReleaseState();
switch (releaseState) {
case NEED_RESTART:
case NEED_ROLLBACK:
LambdaUpdateWrapper<Application> updateWrapper =
new LambdaUpdateWrapper<Application>()
- .eq(Application::getId, application.getId())
+ .eq(Application::getId, appId)
.set(Application::getRelease, ReleaseState.DONE.get());
applicationService.update(updateWrapper);
break;
@@ -422,17 +423,18 @@ public class FlinkAppHttpWatcher {
// The current state is running, and there is a current task in the
savePointCache,
// indicating that the task is doing savepoint
- if (SAVEPOINT_CACHE.getIfPresent(application.getId()) != null) {
+ if (SAVEPOINT_CACHE.getIfPresent(appId) != null) {
application.setOptionState(OptionState.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionState.NONE.getValue());
}
application.setState(currentState.getValue());
doPersistMetrics(application, false);
- cleanOptioning(optionState, application.getId());
+ cleanOptioning(optionState, appId);
}
private void doPersistMetrics(Application application, boolean stopWatch) {
+ Long appId = application.getId();
if (FlinkAppState.isEndState(application.getState())) {
application.setOverview(null);
application.setTotalTM(null);
@@ -441,16 +443,16 @@ public class FlinkAppHttpWatcher {
application.setAvailableSlot(null);
application.setJmMemory(null);
application.setTmMemory(null);
- unWatching(application.getId());
+ unWatching(appId);
} else if (stopWatch) {
- unWatching(application.getId());
+ unWatching(appId);
} else {
- WATCHING_APPS.put(application.getId(), application);
+ WATCHING_APPS.put(appId, application);
}
- StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
+ StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
StateChangeEvent nowEvent = StateChangeEvent.of(application);
if (!nowEvent.equals(event)) {
- PREVIOUS_STATUS.put(application.getId(), nowEvent);
+ PREVIOUS_STATUS.put(appId, nowEvent);
applicationService.persistMetrics(application);
}
}
@@ -831,10 +833,12 @@ public class FlinkAppHttpWatcher {
return false;
}
StateChangeEvent that = (StateChangeEvent) object;
+ if (optionState != that.optionState) {
+ return false;
+ }
return Objects.equals(id, that.id)
&& Objects.equals(jobId, that.jobId)
&& appState == that.appState
- && optionState == that.optionState
&& Objects.equals(jobManagerUrl, that.jobManagerUrl);
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index 106c33cdc..ea0ddb326 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -90,7 +90,7 @@ export const useSavepoint = (updateOption: Fn) => {
savePoint: unref(customSavepoint),
});
if (data.data === false) {
- createErrorSwal('custom savepoint path is invalid, ' +
data.message);
+ await createErrorSwal('custom savepoint path is invalid, ' +
data.message);
reject('custom savepoint path is invalid');
} else {
await handleSavepointAction(savepointReq);
@@ -104,8 +104,11 @@ export const useSavepoint = (updateOption: Fn) => {
}
} else {
const { data } = await fetchCheckSavepointPath({ id: appId.value
});
- if (data.data) await handleSavepointAction(savepointReq);
- else createErrorSwal(data.message);
+ if (data.data) {
+ await handleSavepointAction(savepointReq);
+ } else {
+ await createErrorSwal(data.message);
+ }
reject();
}
} catch (error) {