This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 46f2d205c [Bug]Solve the jobid is empty cause canceled inaccurate
Patch (#1604)
46f2d205c is described below
commit 46f2d205c4c9d7764d4a2d6757207e9218be741c
Author: monster <[email protected]>
AuthorDate: Thu Sep 15 23:44:59 2022 +0800
[Bug]Solve the jobid is empty cause canceled inaccurate Patch (#1604)
* [Bug]Solve the jobid is empty cause canceled inaccurate 2
* Update ApplicationServiceImpl.java
---
.../console/core/service/impl/ApplicationServiceImpl.java | 6 ++----
.../flink/submit/impl/KubernetesNativeApplicationSubmit.scala | 2 +-
2 files changed, 3 insertions(+), 5 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 61316b96a..980c9a0d8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -574,7 +574,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appParam.setLaunch(LaunchState.NEED_LAUNCH.get());
appParam.setOptionState(OptionState.NONE.getValue());
appParam.setCreateTime(new Date());
- appParam.setJobId(new JobID().toHexString());
appParam.doSetHotParams();
if (appParam.isUploadJob()) {
String jarPath =
WebUtils.getAppTempDir().getAbsolutePath().concat("/").concat(appParam.getJar());
@@ -1188,6 +1187,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
assert application != null;
+ application.setJobId(new JobID().toHexString());
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -1379,9 +1379,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
application.setAppId(submitResponse.clusterId());
- if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
- application.setJobId(submitResponse.jobId());
- }
+
if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
application.setJobManagerUrl(submitResponse.jobManagerUrl());
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index c21ff0c88..92b2a3899 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -71,7 +71,7 @@ object KubernetesNativeApplicationSubmit extends
KubernetesNativeSubmitTrait {
.getClusterClient
val clusterId = clusterClient.getClusterId
- val result = SubmitResponse(clusterId, flinkConfig.toMap)
+ val result = SubmitResponse(clusterId, flinkConfig.toMap,
submitRequest.flinkJobID)
logInfo(s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}")
result
} catch {