This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 46f2d205c [Bug]Solve the jobid is empty cause canceled inaccurate 
Patch (#1604)
46f2d205c is described below

commit 46f2d205c4c9d7764d4a2d6757207e9218be741c
Author: monster <[email protected]>
AuthorDate: Thu Sep 15 23:44:59 2022 +0800

    [Bug]Solve the jobid is empty cause canceled inaccurate Patch (#1604)
    
    * [Bug]Solve the jobid is empty cause canceled inaccurate 2
    
    * Update ApplicationServiceImpl.java
---
 .../console/core/service/impl/ApplicationServiceImpl.java           | 6 ++----
 .../flink/submit/impl/KubernetesNativeApplicationSubmit.scala       | 2 +-
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 61316b96a..980c9a0d8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -574,7 +574,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         appParam.setLaunch(LaunchState.NEED_LAUNCH.get());
         appParam.setOptionState(OptionState.NONE.getValue());
         appParam.setCreateTime(new Date());
-        appParam.setJobId(new JobID().toHexString());
         appParam.doSetHotParams();
         if (appParam.isUploadJob()) {
             String jarPath = 
WebUtils.getAppTempDir().getAbsolutePath().concat("/").concat(appParam.getJar());
@@ -1188,6 +1187,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
         assert application != null;
 
+        application.setJobId(new JobID().toHexString());
         // if manually started, clear the restart flag
         if (!auto) {
             application.setRestartCount(0);
@@ -1379,9 +1379,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                     }
                 }
                 application.setAppId(submitResponse.clusterId());
-                if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
-                    application.setJobId(submitResponse.jobId());
-                }
+
                 if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
                     
application.setJobManagerUrl(submitResponse.jobManagerUrl());
                     
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index c21ff0c88..92b2a3899 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -71,7 +71,7 @@ object KubernetesNativeApplicationSubmit extends 
KubernetesNativeSubmitTrait {
         .getClusterClient
 
       val clusterId = clusterClient.getClusterId
-      val result = SubmitResponse(clusterId, flinkConfig.toMap)
+      val result = SubmitResponse(clusterId, flinkConfig.toMap, 
submitRequest.flinkJobID)
       logInfo(s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}")
       result
     } catch {

Reply via email to