This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new d01b98d5c [Improve] Improved application process control (#4083)
d01b98d5c is described below

commit d01b98d5ceeb48e168f2d35c9afb59e706e3b7cb
Author: lenoxzhao <[email protected]>
AuthorDate: Tue Sep 24 07:21:33 2024 +0800

    [Improve] Improved application process control (#4083)
    
    * fix: add nickname
    
    * improve: improve exception check
    
    * fix: fix args fetch
    
    * improve: fix spark workspace and add spark sql demo
---
 .../apache/streampark/common/conf/Workspace.scala  |   2 +
 .../streampark/common/util/HadoopUtils.scala       |   6 +-
 .../streampark/common/util/PropertiesUtils.scala   |  11 +--
 .../src/main/assembly/script/data/mysql-data.sql   |  22 ++++-
 .../src/main/assembly/script/data/pgsql-data.sql   |  20 +++-
 .../core/controller/SparkProxyController.java      | 101 +++++++++++++++++++++
 .../console/core/entity/SparkApplication.java      |   5 +-
 .../console/core/runner/EnvInitializer.java        |   1 +
 .../console/core/service/ProxyService.java         |   3 +
 .../impl/SparkApplicationActionServiceImpl.java    |   3 +-
 .../impl/SparkApplicationManageServiceImpl.java    |   4 +-
 .../core/service/impl/ProxyServiceImpl.java        |  14 +++
 .../impl/SparkApplicationBackUpServiceImpl.java    |   2 +-
 .../console/core/watcher/SparkAppHttpWatcher.java  |  20 +++-
 .../src/main/resources/db/data-h2.sql              |  19 ++++
 .../mapper/core/SparkApplicationMapper.xml         |   6 ++
 .../src/views/spark/app/create.vue                 |  14 ++-
 .../src/views/spark/app/data/detail.data.ts        |   3 +-
 .../spark/client/bean/SubmitResponse.scala         |   1 +
 .../spark/client/proxy/SparkShimsProxy.scala       |   3 +-
 .../streampark/spark/client/impl/YarnClient.scala  |  43 ++++++---
 21 files changed, 268 insertions(+), 35 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index cc9161f5e..c8ce1dd95 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -109,6 +109,8 @@ case class Workspace(storageType: StorageType) {
 
   lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"
 
+  lazy val SPARK_APP_WORKSPACE = s"$WORKSPACE/spark-workspace"
+
   lazy val APP_FLINK = s"$WORKSPACE/flink"
 
   lazy val APP_SPARK = s"$WORKSPACE/spark"
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 9262256c9..617127bb9 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.service.Service.STATE
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+import org.apache.hadoop.yarn.api.records.{ApplicationId, 
FinalApplicationStatus, YarnApplicationState}
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
@@ -277,6 +277,10 @@ object HadoopUtils extends Logger {
     YarnApplicationState.values.find(_.name() == state).orNull
   }
 
+  def toYarnFinalStatus(state: String): FinalApplicationStatus = {
+    FinalApplicationStatus.values.find(_.name() == state).orNull
+  }
+
   private class HadoopConfiguration extends Configuration {
 
     private lazy val rewriteNames = List(
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 9898ae973..f7f68ae45 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -411,18 +411,15 @@ object PropertiesUtils extends Logger {
   }
 
   /** extract spark configuration from sparkApplication.appArgs */
-  @Nonnull def extractSparkArgumentsAsJava(arguments: String): 
JavaList[String] =
-    new JavaArrayList[String](extractSparkArguments(arguments))
-
-  @Nonnull def extractSparkArguments(arguments: String): List[String] = {
-    if (StringUtils.isEmpty(arguments)) List.empty[String]
+  @Nonnull def extractSparkArgumentsAsJava(arguments: String): 
JavaList[String] = {
+    val list = new JavaArrayList[String]()
+    if (StringUtils.isEmpty(arguments)) list
     else {
-      val list = List[String]()
       arguments.split(SPARK_ARGUMENT_REGEXP) match {
         case d if Utils.isNotEmpty(d) =>
           d.foreach(x => {
             if (x.nonEmpty) {
-              list :+ x
+              list.add(x)
             }
           })
         case _ =>
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index 0fe8fdf87..ce909e13d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -51,6 +51,26 @@ insert into `t_flink_project` values (100000, 100000, 
'streampark-quickstart', '
 -- ----------------------------
 insert into `t_flink_sql` values (100000, 100000, 
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
 null, null, 1, 1, now());
 
+-- ----------------------------
+-- Records of t_spark_app
+-- ----------------------------
+insert into `t_spark_app` (
+     `id`, `team_id`, `job_type`, `app_type`, `app_name`, `execution_mode`, 
`resource_from`, `main_class`,
+     `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`, 
`option_state`, `user_id`,
+     `description`, `tracking`, `release`, `build`, `create_time`, 
`modify_time`, `tags`)
+values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+
+-- ----------------------------
+-- Records of t_spark_effective
+-- ----------------------------
+insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+
+-- ----------------------------
+-- Records of t_spark_sql
+-- ----------------------------
+insert into `t_spark_sql` values (100000, 100000, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
+
+
 -- ----------------------------
 -- Records of t_menu
 -- ----------------------------
@@ -88,7 +108,7 @@ insert into `t_menu` values (110302, 110300, 'cluster edit', 
'/flink/edit_cluste
 
 insert into `t_menu` values (120100, 120000, 'spark.application', 
'/spark/app', 'spark/app/index', null, null, '0', 1, 2, now(), now());
 insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home', 
'spark/home/index', null, null, '0', 1, 3, now(), now());
-insert into `t_menu` values (120300, 120000, 'spark.createApplication', 
'/spark/app/create', 'spark/app/create', 'app:create', '', '0', 0, null, now(), 
now());
+insert into `t_menu` values (120300, 120000, 'spark.createApplication', 
'/spark/app/add', 'spark/app/create', 'app:create', '', '0', 0, null, now(), 
now());
 insert into `t_menu` values (120400, 120000, 'spark.updateApplication', 
'/spark/app/edit', 'spark/app/edit', 'app:update', '', '0', 0, null, now(), 
now());
 insert into `t_menu` values (120500, 120000, 'spark.applicationDetail', 
'/spark/app/detail', 'spark/app/detail', 'app:detail', '', '0', 0, null, now(), 
now());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
index 9d82d4246..cbe75e12f 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
@@ -41,12 +41,30 @@ insert into "public"."t_flink_effective" values (100000, 
100000, 2, 100000, now(
 -- ----------------------------
 insert into "public"."t_flink_project" values (100000, 100000, 
'streampark-quickstart', 
'https://github.com/apache/incubator-streampark-quickstart', 'release-2.0.0', 
null, null, null, null, null, null, 1, 1, null, 'streampark-quickstart', -1, 
now(), now());
 
-
 -- ----------------------------
 -- Records of t_flink_sql
 -- ----------------------------
 insert into "public"."t_flink_sql" values (100000, 100000, 
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
 null, null, 1, 1, now());
 
+-- ----------------------------
+-- Records of t_spark_app
+-- ----------------------------
+insert into "public"."t_spark_app" (
+     "id", "team_id", "job_type", "app_type", "app_name", "execution_mode", 
"resource_from", "main_class",
+     "yarn_queue", "k8s_image_pull_policy", "k8s_namespace", "state", 
"option_state", "user_id",
+     "description", "tracking", "release", "build", "create_time", 
"modify_time", "tags")
+values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+
+-- ----------------------------
+-- Records of t_spark_effective
+-- ----------------------------
+insert into "t_spark_effective" values (100000, 100000, 4, 100000, now());
+
+-- ----------------------------
+-- Records of t_spark_sql
+-- ----------------------------
+insert into "t_spark_sql" values (100000, 100000, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
+
 -- ----------------------------
 -- Records of t_menu
 -- ----------------------------
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
new file mode 100644
index 000000000..c7ad83062
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+import org.apache.streampark.console.core.enums.UserTypeEnum;
+import org.apache.streampark.console.core.service.ProxyService;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
+import org.apache.streampark.console.core.util.ServiceHelper;
+import org.apache.streampark.console.system.entity.Member;
+import org.apache.streampark.console.system.entity.User;
+import org.apache.streampark.console.system.service.MemberService;
+
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("spark/proxy")
+public class SparkProxyController {
+
+    @Autowired
+    private ProxyService proxyService;
+
+    @Autowired
+    private SparkApplicationManageService applicationManageService;
+
+    @Autowired
+    private SparkApplicationLogService logService;
+
+    @Autowired
+    private MemberService memberService;
+
+    @GetMapping("{type}/{id}/**")
+    @RequiresPermissions("app:view")
+    public ResponseEntity<?> proxySpark(HttpServletRequest request, 
@PathVariable("type") String type,
+                                        @PathVariable("id") Long id) throws 
Exception {
+        return proxy(type, request, id);
+    }
+
+    private ResponseEntity<?> proxy(String type, HttpServletRequest request, 
Long id) throws Exception {
+        SparkApplicationLog log;
+
+        switch (type) {
+            case "yarn":
+                log = logService.getById(id);
+                checkProxyAppLog(log);
+                return proxyService.proxyYarn(request, log);
+            default:
+                return ResponseEntity.notFound().build();
+        }
+    }
+
+    private void checkProxyApp(SparkApplication app) {
+        ApiAlertException.throwIfNull(app, "Invalid operation, application is 
invalid.");
+
+        User user = ServiceHelper.getLoginUser();
+        ApiAlertException.throwIfNull(user, "Permission denied, please login 
first.");
+
+        if (user.getUserType() != UserTypeEnum.ADMIN) {
+            Member member = memberService.getByTeamIdUserName(app.getTeamId(), 
user.getUsername());
+            ApiAlertException.throwIfNull(member,
+                "Permission denied, this job not created by the current user, 
And the job cannot be found in the current user's team.");
+        }
+    }
+
+    private void checkProxyAppLog(SparkApplicationLog log) {
+        ApiAlertException.throwIfNull(log, "Invalid operation, The application 
log not found.");
+        SparkApplication app = 
applicationManageService.getById(log.getAppId());
+        checkProxyApp(app);
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 1685de3ea..4789b9195 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -204,6 +204,7 @@ public class SparkApplication extends BaseEntity {
     private transient String teamResource;
     private transient String dependency;
     private transient Long sqlId;
+    private transient String nickName;
     private transient String sparkSql;
     private transient Boolean backUp = false;
     private transient Boolean restart = false;
@@ -335,14 +336,14 @@ public class SparkApplication extends BaseEntity {
 
     @JsonIgnore
     public String getLocalAppHome() {
-        String path = String.format("%s/%s", 
Workspace.local().APP_WORKSPACE(), id.toString());
+        String path = String.format("%s/%s", 
Workspace.local().SPARK_APP_WORKSPACE(), id.toString());
         log.info("local appHome:{}", path);
         return path;
     }
 
     @JsonIgnore
     public String getRemoteAppHome() {
-        String path = String.format("%s/%s", 
Workspace.remote().APP_WORKSPACE(), id.toString());
+        String path = String.format("%s/%s", 
Workspace.remote().SPARK_APP_WORKSPACE(), id.toString());
         log.info("remote appHome:{}", path);
         return path;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac..c2d026bad 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -147,6 +147,7 @@ public class EnvInitializer implements ApplicationRunner {
         Arrays.asList(
             workspace.APP_UPLOADS(),
             workspace.APP_WORKSPACE(),
+            workspace.SPARK_APP_WORKSPACE(),
             workspace.APP_BACKUPS(),
             workspace.APP_SAVEPOINTS(),
             workspace.APP_PYTHON(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
index d391017dc..8eed7e29d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
 
 import org.springframework.http.ResponseEntity;
 
@@ -28,6 +29,8 @@ public interface ProxyService {
 
     ResponseEntity<?> proxyFlink(HttpServletRequest request, Application app) 
throws Exception;
 
+    ResponseEntity<?> proxyYarn(HttpServletRequest request, 
SparkApplicationLog log) throws Exception;
+
     ResponseEntity<?> proxyYarn(HttpServletRequest request, ApplicationLog 
log) throws Exception;
 
     ResponseEntity<?> proxyHistory(HttpServletRequest request, ApplicationLog 
log) throws Exception;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 7b098c7a3..0c7296a3c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -360,6 +360,7 @@ public class SparkApplicationActionServiceImpl
                     application.setAppId(response.sparkAppId());
                 }
                 applicationLog.setSparkAppId(response.sparkAppId());
+                applicationLog.setTrackUrl(response.trackingUrl());
                 application.setStartTime(new Date());
                 application.setEndTime(null);
 
@@ -480,7 +481,7 @@ public class SparkApplicationActionServiceImpl
                     }
                 }
 
-                if (SparkExecutionMode.YARN_CLUSTER == executionModeEnum) {
+                if (SparkExecutionMode.isYarnMode(executionModeEnum)) {
                     switch (application.getApplicationType()) {
                         case STREAMPARK_SPARK:
                             sparkUserJar = String.format(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index f171594b7..c08830524 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -194,9 +194,9 @@ public class SparkApplicationManageServiceImpl
         try {
             application
                 .getFsOperator()
-                
.delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
+                
.delete(application.getWorkspace().SPARK_APP_WORKSPACE().concat("/").concat(appId.toString()));
             // try to delete yarn-application, and leave no trouble.
-            String path = 
Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString());
+            String path = 
Workspace.of(StorageType.HDFS).SPARK_APP_WORKSPACE().concat("/").concat(appId.toString());
             if (HdfsOperator.exists(path)) {
                 HdfsOperator.delete(path);
             }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index 7a8b89c8d..7edc1e38b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.ProxyService;
 import org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper;
@@ -140,6 +141,19 @@ public class ProxyServiceImpl implements ProxyService {
         return proxyYarnRequest(request, url);
     }
 
+    @Override
+    public ResponseEntity<?> proxyYarn(HttpServletRequest request, 
SparkApplicationLog log) throws Exception {
+        ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+        String yarnId = log.getSparkAppId();
+        if (StringUtils.isBlank(yarnId)) {
+            return builder.body("The yarn application id is null.");
+        }
+        String yarnURL = YarnUtils.getRMWebAppProxyURL();
+        String url = yarnURL + "/proxy/" + yarnId + "/";
+        url += getRequestURL(request, "/proxy/yarn/" + log.getId());
+        return proxyYarnRequest(request, url);
+    }
+
     @Override
     public ResponseEntity<?> proxyHistory(HttpServletRequest request, 
ApplicationLog log) throws Exception {
         ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
index eaec6ab0d..0009738af 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
@@ -143,7 +143,7 @@ public class SparkApplicationBackUpServiceImpl
         if (!backUpPages.getRecords().isEmpty()) {
             SparkApplicationBackUp backup = backUpPages.getRecords().get(0);
             String path = backup.getPath();
-            appParam.getFsOperator().move(path, 
appParam.getWorkspace().APP_WORKSPACE());
+            appParam.getFsOperator().move(path, 
appParam.getWorkspace().SPARK_APP_WORKSPACE());
             super.removeById(backup.getId());
         }
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index 46d5f73fe..8e7c15476 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.watcher;
 
+import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.core.bean.AlertTemplate;
@@ -27,6 +28,8 @@ import org.apache.streampark.console.core.enums.StopFromEnum;
 import org.apache.streampark.console.core.metrics.spark.Job;
 import 
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
@@ -34,6 +37,7 @@ import 
org.apache.streampark.console.core.service.application.SparkApplicationMa
 import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hc.core5.util.Timeout;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -74,6 +78,12 @@ public class SparkAppHttpWatcher {
     @Autowired
     private SparkApplicationInfoService applicationInfoService;
 
+    @Autowired
+    private SparkApplicationLogService applicationLogService;
+
+    @Autowired
+    private SparkEnvService sparkEnvService;
+
     @Autowired
     private AlertService alertService;
 
@@ -199,6 +209,8 @@ public class SparkAppHttpWatcher {
         } else {
             try {
                 String state = yarnAppInfo.getApp().getState();
+                FinalApplicationStatus appFinalStatus =
+                    
HadoopUtils.toYarnFinalStatus(yarnAppInfo.getApp().getFinalStatus());
                 SparkAppStateEnum sparkAppStateEnum = 
SparkAppStateEnum.of(state);
                 if (SparkAppStateEnum.OTHER == sparkAppStateEnum) {
                     return;
@@ -210,8 +222,15 @@ public class SparkAppHttpWatcher {
                         application.getAppId(),
                         sparkAppStateEnum);
                     application.setEndTime(new Date());
+                    if (appFinalStatus.equals(FinalApplicationStatus.FAILED)) {
+                        sparkAppStateEnum = SparkAppStateEnum.FAILED;
+                    }
                 }
                 if (SparkAppStateEnum.RUNNING == sparkAppStateEnum) {
+                    if (application.getStartTime() != null
+                        && application.getStartTime().getTime() > 0) {
+                        application.setDuration(System.currentTimeMillis() - 
application.getStartTime().getTime());
+                    }
                     SparkApplicationSummary summary;
                     try {
                         summary = httpStageAndTaskStatus(application);
@@ -305,7 +324,6 @@ public class SparkAppHttpWatcher {
         String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
         return yarnRestRequest(reqURL, YarnAppInfo.class);
     }
-
     private Job[] httpJobsStatus(SparkApplication application) throws 
IOException {
         String format = "proxy/%s/api/v1/applications/%s/jobs";
         String reqURL = String.format(format, application.getAppId(), 
application.getAppId());
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 474714271..770f9646c 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -45,6 +45,25 @@ insert into `t_flink_project` values (100000, 100000, 
'streampark-quickstart', '
 -- ----------------------------
 insert into `t_flink_sql` values (100000, 100000, 
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
 null, null, 1, 1, now());
 
+-- ----------------------------
+-- Records of t_spark_app
+-- ----------------------------
+insert into `t_spark_app` (
+     `id`, `team_id`, `job_type`, `app_type`, `app_name`, `execution_mode`, 
`resource_from`, `main_class`,
+     `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`, 
`option_state`, `user_id`,
+     `description`, `tracking`, `release`, `build`, `create_time`, 
`modify_time`, `tags`)
+values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+
+-- ----------------------------
+-- Records of t_spark_effective
+-- ----------------------------
+insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+
+-- ----------------------------
+-- Records of t_spark_sql
+-- ----------------------------
+insert into `t_spark_sql` values (100000, 100000, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
+
 -- ----------------------------
 -- Records of t_menu
 -- ----------------------------
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
index 2e0b6ce8b..bceb50b40 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
@@ -124,6 +124,12 @@
         </where>
     </select>
 
+    <select id="selectApp" 
resultType="org.apache.streampark.console.core.entity.SparkApplication" 
parameterType="long">
+        select *
+        from t_spark_app
+        where id = #{id}
+    </select>
+
     <select id="selectAppsByTeamId" 
resultType="org.apache.streampark.console.core.entity.SparkApplication" 
parameterType="java.lang.Long">
         select
             t.*,
diff --git 
a/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue 
b/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
index 81ba38d34..cf1b7e5d2 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
@@ -102,9 +102,9 @@
   async function handleAppSubmit(formValue: Recordable) {
     let config = formValue.configOverride;
     if (config != null && config !== undefined && config.trim() != '') {
-      formValue.config = encryptByBase64(config);
+      config = encryptByBase64(config);
     } else {
-      formValue.config = null;
+      config = null;
     }
     if (formValue.jobType == JobTypeEnum.SQL) {
       if (formValue.sparkSql == null || formValue.sparkSql.trim() === '') {
@@ -116,9 +116,15 @@
           throw new Error(access);
         }
       }
-      handleSQLMode(formValue);
+      handleSQLMode({
+        ...formValue,
+        config,
+      });
     } else {
-      handleCustomJobMode(formValue);
+      handleCustomJobMode({
+        ...formValue,
+        config,
+      });
     }
   }
   /* send create request */
diff --git 
a/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
 
b/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
index 97413d76b..8ee123011 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
@@ -113,7 +113,8 @@ export const getBackupColumns = (): BasicColumn[] => [
 
 export const getOptionLogColumns = (): BasicColumn[] => [
   { title: 'Operation Name', dataIndex: 'optionName' },
-  { title: 'Cluster Id', dataIndex: 'yarnAppId' },
+  { title: 'Application Id', dataIndex: 'sparkAppId' },
+  { title: 'Tracking Url', dataIndex: 'trackUrl' },
   { title: 'Start Status', dataIndex: 'success' },
   { title: 'Option Time', dataIndex: 'optionTime' },
 ];
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
index 0b58da799..4037d6b7f 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
@@ -21,4 +21,5 @@ import org.apache.streampark.common.util.Implicits.JavaMap
 
 case class SubmitResponse(
     var sparkAppId: String,
+    trackingUrl: String,
     sparkProperties: JavaMap[String, String])
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index a62ed677c..9287f72db 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -56,7 +56,8 @@ object SparkShimsProxy extends Logger {
     "ch.qos.logback",
     "org.xml",
     "org.w3c",
-    "org.apache.hadoop")
+    "org.apache.hadoop",
+    "org.apache.spark.launcher")
 
   def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = {
     val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index 15cf4d60b..031400825 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -19,11 +19,12 @@ package org.apache.streampark.spark.client.impl
 
 import 
org.apache.streampark.common.conf.ConfigKeys.{KEY_SPARK_YARN_AM_NODE_LABEL, 
KEY_SPARK_YARN_EXECUTOR_NODE_LABEL, KEY_SPARK_YARN_QUEUE, 
KEY_SPARK_YARN_QUEUE_LABEL, KEY_SPARK_YARN_QUEUE_NAME}
 import org.apache.streampark.common.enums.SparkExecutionMode
-import org.apache.streampark.common.util.HadoopUtils
+import org.apache.streampark.common.util.{HadoopUtils, YarnUtils}
 import org.apache.streampark.common.util.Implicits._
 import org.apache.streampark.spark.client.`trait`.SparkClientTrait
 import org.apache.streampark.spark.client.bean._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
 
@@ -74,17 +75,23 @@ object YarnClient extends SparkClientTrait {
     // 3) launch
     Try(launch(launcher)) match {
       case Success(handle: SparkAppHandle) =>
-        logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${submitRequest.appName} is submit successful, " +
-          s"appid: ${handle.getAppId}, " +
-          s"state: ${handle.getState}")
-        sparkHandles += handle.getAppId -> handle
-        SubmitResponse(handle.getAppId, submitRequest.appProperties)
+        if (handle.getError.isPresent) {
+          logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${submitRequest.appName} submit failed.")
+          throw handle.getError.get()
+        } else {
+          logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${submitRequest.appName} submit successfully, " +
+            s"appid: ${handle.getAppId}, " +
+            s"state: ${handle.getState}")
+          sparkHandles += handle.getAppId -> handle
+          val trackingUrl = 
YarnUtils.getYarnAppTrackingUrl(HadoopUtils.toApplicationId(handle.getAppId))
+          SubmitResponse(handle.getAppId, trackingUrl, 
submitRequest.appProperties)
+        }
       case Failure(e) => throw e
     }
   }
 
   private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = {
-    logger.info("[StreamPark][Spark][YarnClient] The spark job starting")
+    logger.info("[StreamPark][Spark][YarnClient] The spark job start 
submitting")
     val submitFinished: CountDownLatch = new CountDownLatch(1)
     val sparkAppHandle = sparkLauncher.startApplication(new 
SparkAppHandle.Listener() {
       override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {}
@@ -92,14 +99,22 @@ object YarnClient extends SparkClientTrait {
         if (handle.getAppId != null) {
           logger.info(s"${handle.getAppId} stateChanged : 
${handle.getState.toString}")
         } else {
-          logger.info("stateChanged :{}", handle.getState.toString)
-        }
-        if (SparkAppHandle.State.FAILED == handle.getState) {
-          logger.error("Task run failure stateChanged :{}", 
handle.getState.toString)
+          logger.info("stateChanged : {}", handle.getState.toString)
         }
         if (handle.getAppId != null && submitFinished.getCount != 0) {
+          // Task submission succeeded
           submitFinished.countDown()
         }
+        if (handle.getState.isFinal) {
+          if (StringUtils.isNotBlank(handle.getAppId) && 
sparkHandles.containsKey(handle.getAppId)) {
+            sparkHandles.remove(handle.getAppId)
+          }
+          if (submitFinished.getCount != 0) {
+            // Task submission failed
+            submitFinished.countDown()
+          }
+          logger.info("Task is end, final state : {}", 
handle.getState.toString)
+        }
       }
     })
     submitFinished.await()
@@ -107,7 +122,11 @@ object YarnClient extends SparkClientTrait {
   }
 
   private def prepareSparkLauncher(submitRequest: SubmitRequest) = {
-    new SparkLauncher()
+    val env = new JavaHashMap[String, String]()
+    if (StringUtils.isNotBlank(submitRequest.hadoopUser)) {
+      env.put("HADOOP_USER_NAME", submitRequest.hadoopUser)
+    }
+    new SparkLauncher(env)
       .setSparkHome(submitRequest.sparkVersion.sparkHome)
       .setAppResource(submitRequest.userJarPath)
       .setMainClass(submitRequest.appMain)


Reply via email to