This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 45f169ab6 [Improve] Graceful stop flink job with savepoint (#3796)
45f169ab6 is described below
commit 45f169ab6627ada7a07aa8ce2f62aa31a7c2f7e5
Author: mzzx <[email protected]>
AuthorDate: Wed Jun 26 23:53:04 2024 +0800
[Improve] Graceful stop flink job with savepoint (#3796)
Co-authored-by: benjobs <[email protected]>
---
.../core/service/application/impl/ApplicationInfoServiceImpl.java | 3 ++-
.../apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java | 1 +
.../org/apache/streampark/flink/client/trait/FlinkClientTrait.scala | 4 ----
3 files changed, 3 insertions(+), 5 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 5bf8c2157..14c708f4f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -245,7 +245,8 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
@Override
public boolean checkAlter(Application appParam) {
Long appId = appParam.getId();
- if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()) {
+ if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()
+ && FlinkAppStateEnum.FINISHED != appParam.getStateEnum()) {
return false;
}
long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index b0664d0be..d65d4f918 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -483,6 +483,7 @@ public class FlinkAppHttpWatcher {
doPersistMetrics(application, false);
break;
case CANCELED:
+ case FINISHED:
log.info(
"[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state
{}, stop tracking and delete stopFrom!",
currentState.name());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index d936980a9..d0a1bf14a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -538,10 +538,6 @@ trait FlinkClientTrait extends Logger {
case (false, false) =>
client.cancel(jobID).get()
null
- case (true, false) =>
- clientWrapper
- .cancelWithSavepoint(jobID, savePointDir, cancelRequest.nativeFormat)
- .get()
case (_, _) =>
clientWrapper
.stopWithSavepoint(