This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 13fdb36a4 [Bug] flinkclousterid cannot be saved in yarn-session mode 
(#1608)
13fdb36a4 is described below

commit 13fdb36a4130ad6d2f16a0c3c0dc2ba0ba7451b0
Author: xujiangfeng001 <[email protected]>
AuthorDate: Thu Sep 15 14:00:30 2022 +0800

    [Bug] flinkclousterid cannot be saved in yarn-session mode (#1608)
    
    * [Bug] flinkclousterid cannot be saved in yarn-session mode
---
 .../core/service/impl/ApplicationServiceImpl.java  | 12 +++++-----
 .../core/service/impl/FlinkClusterServiceImpl.java |  2 ++
 .../console/core/task/FlinkTrackingTask.java       | 27 +++++++++++++---------
 .../src/views/flink/app/Add.vue                    | 16 +++++++++++--
 4 files changed, 38 insertions(+), 19 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 6fb414953..61316b96a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1049,9 +1049,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 optionMap.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
             }
             if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                String yarnSessionClusterId = (String) 
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID());
-                assert yarnSessionClusterId != null;
-                extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), 
yarnSessionClusterId);
+                FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+                assert cluster != null;
+                extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), 
cluster.getClusterId());
             }
         }
 
@@ -1284,9 +1284,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 dynamicOption.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
             }
             if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                String yarnSessionClusterId = (String) 
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID());
-                assert yarnSessionClusterId != null;
-                extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), 
yarnSessionClusterId);
+                FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+                assert cluster != null;
+                extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), 
cluster.getClusterId());
             }
         }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 19f9cc367..648f7c4d3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SettingService;
+import org.apache.streampark.console.core.task.FlinkTrackingTask;
 import org.apache.streampark.flink.submit.FlinkSubmitter;
 import org.apache.streampark.flink.submit.bean.DeployRequest;
 import org.apache.streampark.flink.submit.bean.DeployResponse;
@@ -188,6 +189,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
                     updateWrapper.set(FlinkCluster::getException, null);
                     update(updateWrapper);
                     result.setStatus(1);
+                    FlinkTrackingTask.removeFlinkCluster(flinkCluster);
                 } else {
                     result.setStatus(0);
                     result.setMsg("deploy cluster failed," + 
deployResponse.message());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 796e034b7..95fa66a51 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -269,9 +269,9 @@ public class FlinkTrackingTask {
         FlinkCluster flinkCluster = getFlinkCluster(application);
         JobsOverview jobsOverview = httpJobsOverview(application, 
flinkCluster);
         Optional<JobsOverview.Job> optional;
-        if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-            optional = jobsOverview.getJobs().size() > 1 ? 
jobsOverview.getJobs().stream().filter(a ->
-                StringUtils.equals(application.getJobId(), 
a.getId())).findFirst() : jobsOverview.getJobs().stream().findFirst();
+        ExecutionMode execMode = application.getExecutionModeEnum();
+        if (ExecutionMode.YARN_APPLICATION.equals(execMode) || 
ExecutionMode.YARN_PER_JOB.equals(execMode)) {
+            optional = jobsOverview.getJobs().size() > 1 ? 
jobsOverview.getJobs().stream().filter(a -> 
StringUtils.equals(application.getJobId(), a.getId())).findFirst() : 
jobsOverview.getJobs().stream().findFirst();
         } else {
             optional = jobsOverview.getJobs().stream().filter(x -> 
x.getId().equals(application.getJobId())).findFirst();
         }
@@ -590,6 +590,13 @@ public class FlinkTrackingTask {
         SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
     }
 
+    public static void removeFlinkCluster(FlinkCluster flinkCluster) {
+        if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) {
+            log.info("remove flink cluster:{}", flinkCluster.getId());
+            FLINK_CLUSTER_MAP.remove(flinkCluster.getId());
+        }
+    }
+
     /**
      * Reload the latest application to the database to avoid the problem of 
inconsistency between the data of cache and database.
      *
@@ -724,7 +731,8 @@ public class FlinkTrackingTask {
 
     private JobsOverview httpJobsOverview(Application application, 
FlinkCluster flinkCluster) throws Exception {
         final String flinkUrl = "jobs/overview";
-        if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+        ExecutionMode execMode = application.getExecutionModeEnum();
+        if (ExecutionMode.YARN_PER_JOB.equals(execMode) || 
ExecutionMode.YARN_APPLICATION.equals(execMode)) {
             String reqURL;
             if (StringUtils.isEmpty(application.getJobManagerUrl())) {
                 String format = "proxy/%s/" + flinkUrl;
@@ -734,12 +742,8 @@ public class FlinkTrackingTask {
                 reqURL = String.format(format, application.getJobManagerUrl());
             }
             JobsOverview jobsOverview = yarnRestRequest(reqURL, 
JobsOverview.class);
-            if (jobsOverview != null && 
ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                List<JobsOverview.Job> jobs = 
jobsOverview.getJobs().stream().filter(x -> 
x.getId().equals(application.getJobId())).collect(Collectors.toList());
-                jobsOverview.setJobs(jobs);
-            }
             return jobsOverview;
-        } else if (ExecutionMode.isRemoteMode(application.getExecutionMode())) 
{
+        } else if (ExecutionMode.REMOTE.equals(execMode) || 
ExecutionMode.YARN_SESSION.equals(execMode)) {
             if (application.getJobId() != null) {
                 String remoteUrl = flinkCluster.getActiveAddress().toURL() + 
"/" + flinkUrl;
                 JobsOverview jobsOverview = httpRestRequest(remoteUrl, 
JobsOverview.class);
@@ -755,7 +759,8 @@ public class FlinkTrackingTask {
 
     private CheckPoints httpCheckpoints(Application application, FlinkCluster 
flinkCluster) throws IOException {
         final String flinkUrl = "jobs/%s/checkpoints";
-        if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+        ExecutionMode execMode = application.getExecutionModeEnum();
+        if (ExecutionMode.YARN_PER_JOB.equals(execMode) || 
ExecutionMode.YARN_APPLICATION.equals(execMode)) {
             String reqURL;
             if (StringUtils.isEmpty(application.getJobManagerUrl())) {
                 String format = "proxy/%s/" + flinkUrl;
@@ -765,7 +770,7 @@ public class FlinkTrackingTask {
                 reqURL = String.format(format, application.getJobManagerUrl(), 
application.getJobId());
             }
             return yarnRestRequest(reqURL, CheckPoints.class);
-        } else if (ExecutionMode.isRemoteMode(application.getExecutionMode())) 
{
+        } else if (ExecutionMode.REMOTE.equals(execMode) || 
ExecutionMode.YARN_SESSION.equals(execMode)) {
             if (application.getJobId() != null) {
                 String remoteUrl = flinkCluster.getActiveAddress().toURL() + 
"/" + String.format(flinkUrl, application.getJobId());
                 return httpRestRequest(remoteUrl, CheckPoints.class);
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 813615e96..6fae3012f 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -2298,12 +2298,18 @@
 
       handleSubmitCustomJob(values) {
         const options = this.handleFormValue(values)
-        if(values.flinkClusterId){
+        if (values.flinkClusterId) {
           const cluster = this.flinkClusters.filter(c => c.id === 
values.flinkClusterId && c.clusterState === 1)[0] || null
           values.clusterId = cluster.id
           values.flinkClusterId = cluster.id
           values.yarnSessionClusterId = cluster.clusterId
         }
+        if (values.yarnSessionClusterId) {
+          const cluster = this.flinkClusters.filter(c => c.clusterId === 
values.yarnSessionClusterId && c.clusterState === 1)[0] || null
+          values.clusterId = cluster.id
+          values.flinkClusterId = cluster.id
+          values.yarnSessionClusterId = cluster.clusterId
+        }
         const params = {
           jobType: 1,
           executionMode: values.executionMode,
@@ -2395,12 +2401,18 @@
         } else {
           config = null
         }
-        if(values.flinkClusterId){
+        if (values.flinkClusterId) {
           const cluster = this.flinkClusters.filter(c => c.id === 
values.flinkClusterId && c.clusterState === 1)[0] || null
           values.clusterId = cluster.id
           values.flinkClusterId = cluster.id
           values.yarnSessionClusterId = cluster.clusterId
         }
+        if (values.yarnSessionClusterId) {
+          const cluster = this.flinkClusters.filter(c => c.clusterId === 
values.yarnSessionClusterId && c.clusterState === 1)[0] || null
+          values.clusterId = cluster.id
+          values.flinkClusterId = cluster.id
+          values.yarnSessionClusterId = cluster.clusterId
+        }
 
         const params = {
           jobType: 2,

Reply via email to