This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 1a75c82bf [Imporve] changeStateEvnet equals improvements (#3603)
1a75c82bf is described below
commit 1a75c82bf3399162bb8adfeaee4ce7484bd12164
Author: benjobs <[email protected]>
AuthorDate: Thu Mar 7 13:05:10 2024 +0800
[Imporve] changeStateEvnet equals improvements (#3603)
* [Imporve] changeStateEvnet equals improvements
* [Improve] trigger savepoint FE pop-window improvements
---------
Co-authored-by: benjobs <[email protected]>
---
.../console/core/service/impl/SavePointServiceImpl.java | 14 ++++++++------
.../streampark/console/core/task/FlinkAppHttpWatcher.java | 6 ++----
.../src/views/flink/app/hooks/useSavepoint.tsx | 1 +
3 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 687026e4a..bee73b6b8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -265,15 +265,17 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
if (!config.isEmpty()) {
savepointPath =
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
- } else {
- // 3.2) At the yarn or k8s mode, then read the savepoint in
flink-conf.yml in the bound
- // flink
- FlinkEnv flinkEnv =
flinkEnvService.getById(application.getVersionId());
- savepointPath =
-
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
}
+ // 3.2) read the savepoint in flink-conf.yml in the bound
+ if (StringUtils.isBlank(savepointPath)) {
+ // flink
+ FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+ savepointPath =
+
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+ }
+
return savepointPath;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 08a7f7902..1c721b455 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -845,12 +845,10 @@ public class FlinkAppHttpWatcher {
return false;
}
StateChangeEvent that = (StateChangeEvent) object;
- if (optionState != that.optionState) {
- return false;
- }
return Objects.equals(id, that.id)
&& Objects.equals(jobId, that.jobId)
- && appState == that.appState
+ && Objects.equals(appState, that.appState)
+ && Objects.equals(optionState, that.optionState)
&& Objects.equals(jobManagerUrl, that.jobManagerUrl);
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index ea0ddb326..f84e1ff62 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -106,6 +106,7 @@ export const useSavepoint = (updateOption: Fn) => {
const { data } = await fetchCheckSavepointPath({ id: appId.value
});
if (data.data) {
await handleSavepointAction(savepointReq);
+ resolve(true);
} else {
await createErrorSwal(data.message);
}