This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d01b98d5c [Improve] Improved application process control (#4083)
d01b98d5c is described below
commit d01b98d5ceeb48e168f2d35c9afb59e706e3b7cb
Author: lenoxzhao <[email protected]>
AuthorDate: Tue Sep 24 07:21:33 2024 +0800
[Improve] Improved application process control (#4083)
* fix: add nickname
* improve: improve exception check
* fix: fix args fetch
* improve: fix spark workspace and add spark sql demo
---
.../apache/streampark/common/conf/Workspace.scala | 2 +
.../streampark/common/util/HadoopUtils.scala | 6 +-
.../streampark/common/util/PropertiesUtils.scala | 11 +--
.../src/main/assembly/script/data/mysql-data.sql | 22 ++++-
.../src/main/assembly/script/data/pgsql-data.sql | 20 +++-
.../core/controller/SparkProxyController.java | 101 +++++++++++++++++++++
.../console/core/entity/SparkApplication.java | 5 +-
.../console/core/runner/EnvInitializer.java | 1 +
.../console/core/service/ProxyService.java | 3 +
.../impl/SparkApplicationActionServiceImpl.java | 3 +-
.../impl/SparkApplicationManageServiceImpl.java | 4 +-
.../core/service/impl/ProxyServiceImpl.java | 14 +++
.../impl/SparkApplicationBackUpServiceImpl.java | 2 +-
.../console/core/watcher/SparkAppHttpWatcher.java | 20 +++-
.../src/main/resources/db/data-h2.sql | 19 ++++
.../mapper/core/SparkApplicationMapper.xml | 6 ++
.../src/views/spark/app/create.vue | 14 ++-
.../src/views/spark/app/data/detail.data.ts | 3 +-
.../spark/client/bean/SubmitResponse.scala | 1 +
.../spark/client/proxy/SparkShimsProxy.scala | 3 +-
.../streampark/spark/client/impl/YarnClient.scala | 43 ++++++---
21 files changed, 268 insertions(+), 35 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index cc9161f5e..c8ce1dd95 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -109,6 +109,8 @@ case class Workspace(storageType: StorageType) {
lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"
+ lazy val SPARK_APP_WORKSPACE = s"$WORKSPACE/spark-workspace"
+
lazy val APP_FLINK = s"$WORKSPACE/flink"
lazy val APP_SPARK = s"$WORKSPACE/spark"
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 9262256c9..617127bb9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.service.Service.STATE
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+import org.apache.hadoop.yarn.api.records.{ApplicationId,
FinalApplicationStatus, YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -277,6 +277,10 @@ object HadoopUtils extends Logger {
YarnApplicationState.values.find(_.name() == state).orNull
}
+ def toYarnFinalStatus(state: String): FinalApplicationStatus = {
+ FinalApplicationStatus.values.find(_.name() == state).orNull
+ }
+
private class HadoopConfiguration extends Configuration {
private lazy val rewriteNames = List(
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 9898ae973..f7f68ae45 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -411,18 +411,15 @@ object PropertiesUtils extends Logger {
}
/** extract spark configuration from sparkApplication.appArgs */
- @Nonnull def extractSparkArgumentsAsJava(arguments: String):
JavaList[String] =
- new JavaArrayList[String](extractSparkArguments(arguments))
-
- @Nonnull def extractSparkArguments(arguments: String): List[String] = {
- if (StringUtils.isEmpty(arguments)) List.empty[String]
+ @Nonnull def extractSparkArgumentsAsJava(arguments: String):
JavaList[String] = {
+ val list = new JavaArrayList[String]()
+ if (StringUtils.isEmpty(arguments)) list
else {
- val list = List[String]()
arguments.split(SPARK_ARGUMENT_REGEXP) match {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
- list :+ x
+ list.add(x)
}
})
case _ =>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index 0fe8fdf87..ce909e13d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -51,6 +51,26 @@ insert into `t_flink_project` values (100000, 100000,
'streampark-quickstart', '
-- ----------------------------
insert into `t_flink_sql` values (100000, 100000,
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
null, null, 1, 1, now());
+-- ----------------------------
+-- Records of t_spark_app
+-- ----------------------------
+insert into `t_spark_app` (
+ `id`, `team_id`, `job_type`, `app_type`, `app_name`, `execution_mode`,
`resource_from`, `main_class`,
+ `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`,
`option_state`, `user_id`,
+ `description`, `tracking`, `release`, `build`, `create_time`,
`modify_time`, `tags`)
+values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+
+-- ----------------------------
+-- Records of t_spark_effective
+-- ----------------------------
+insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+
+-- ----------------------------
+-- Records of t_spark_sql
+-- ----------------------------
+insert into `t_spark_sql` values (100000, 100000,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
+
+
-- ----------------------------
-- Records of t_menu
-- ----------------------------
@@ -88,7 +108,7 @@ insert into `t_menu` values (110302, 110300, 'cluster edit',
'/flink/edit_cluste
insert into `t_menu` values (120100, 120000, 'spark.application',
'/spark/app', 'spark/app/index', null, null, '0', 1, 2, now(), now());
insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home',
'spark/home/index', null, null, '0', 1, 3, now(), now());
-insert into `t_menu` values (120300, 120000, 'spark.createApplication',
'/spark/app/create', 'spark/app/create', 'app:create', '', '0', 0, null, now(),
now());
+insert into `t_menu` values (120300, 120000, 'spark.createApplication',
'/spark/app/add', 'spark/app/create', 'app:create', '', '0', 0, null, now(),
now());
insert into `t_menu` values (120400, 120000, 'spark.updateApplication',
'/spark/app/edit', 'spark/app/edit', 'app:update', '', '0', 0, null, now(),
now());
insert into `t_menu` values (120500, 120000, 'spark.applicationDetail',
'/spark/app/detail', 'spark/app/detail', 'app:detail', '', '0', 0, null, now(),
now());
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
index 9d82d4246..cbe75e12f 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
@@ -41,12 +41,30 @@ insert into "public"."t_flink_effective" values (100000,
100000, 2, 100000, now(
-- ----------------------------
insert into "public"."t_flink_project" values (100000, 100000,
'streampark-quickstart',
'https://github.com/apache/incubator-streampark-quickstart', 'release-2.0.0',
null, null, null, null, null, null, 1, 1, null, 'streampark-quickstart', -1,
now(), now());
-
-- ----------------------------
-- Records of t_flink_sql
-- ----------------------------
insert into "public"."t_flink_sql" values (100000, 100000,
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
null, null, 1, 1, now());
+-- ----------------------------
+-- Records of t_spark_app
+-- ----------------------------
+insert into "public"."t_spark_app" (
+ "id", "team_id", "job_type", "app_type", "app_name", "execution_mode",
"resource_from", "main_class",
+ "yarn_queue", "k8s_image_pull_policy", "k8s_namespace", "state",
"option_state", "user_id",
+ "description", "tracking", "release", "build", "create_time",
"modify_time", "tags")
+values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+
+-- ----------------------------
+-- Records of t_spark_effective
+-- ----------------------------
+insert into "t_spark_effective" values (100000, 100000, 4, 100000, now());
+
+-- ----------------------------
+-- Records of t_spark_sql
+-- ----------------------------
+insert into "t_spark_sql" values (100000, 100000,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
+
-- ----------------------------
-- Records of t_menu
-- ----------------------------
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
new file mode 100644
index 000000000..c7ad83062
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+import org.apache.streampark.console.core.enums.UserTypeEnum;
+import org.apache.streampark.console.core.service.ProxyService;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
+import org.apache.streampark.console.core.util.ServiceHelper;
+import org.apache.streampark.console.system.entity.Member;
+import org.apache.streampark.console.system.entity.User;
+import org.apache.streampark.console.system.service.MemberService;
+
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("spark/proxy")
+public class SparkProxyController {
+
+ @Autowired
+ private ProxyService proxyService;
+
+ @Autowired
+ private SparkApplicationManageService applicationManageService;
+
+ @Autowired
+ private SparkApplicationLogService logService;
+
+ @Autowired
+ private MemberService memberService;
+
+ @GetMapping("{type}/{id}/**")
+ @RequiresPermissions("app:view")
+ public ResponseEntity<?> proxySpark(HttpServletRequest request,
@PathVariable("type") String type,
+ @PathVariable("id") Long id) throws
Exception {
+ return proxy(type, request, id);
+ }
+
+ private ResponseEntity<?> proxy(String type, HttpServletRequest request,
Long id) throws Exception {
+ SparkApplicationLog log;
+
+ switch (type) {
+ case "yarn":
+ log = logService.getById(id);
+ checkProxyAppLog(log);
+ return proxyService.proxyYarn(request, log);
+ default:
+ return ResponseEntity.notFound().build();
+ }
+ }
+
+ private void checkProxyApp(SparkApplication app) {
+ ApiAlertException.throwIfNull(app, "Invalid operation, application is
invalid.");
+
+ User user = ServiceHelper.getLoginUser();
+ ApiAlertException.throwIfNull(user, "Permission denied, please login
first.");
+
+ if (user.getUserType() != UserTypeEnum.ADMIN) {
+ Member member = memberService.getByTeamIdUserName(app.getTeamId(),
user.getUsername());
+ ApiAlertException.throwIfNull(member,
+ "Permission denied, this job not created by the current user,
And the job cannot be found in the current user's team.");
+ }
+ }
+
+ private void checkProxyAppLog(SparkApplicationLog log) {
+ ApiAlertException.throwIfNull(log, "Invalid operation, The application
log not found.");
+ SparkApplication app =
applicationManageService.getById(log.getAppId());
+ checkProxyApp(app);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 1685de3ea..4789b9195 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -204,6 +204,7 @@ public class SparkApplication extends BaseEntity {
private transient String teamResource;
private transient String dependency;
private transient Long sqlId;
+ private transient String nickName;
private transient String sparkSql;
private transient Boolean backUp = false;
private transient Boolean restart = false;
@@ -335,14 +336,14 @@ public class SparkApplication extends BaseEntity {
@JsonIgnore
public String getLocalAppHome() {
- String path = String.format("%s/%s",
Workspace.local().APP_WORKSPACE(), id.toString());
+ String path = String.format("%s/%s",
Workspace.local().SPARK_APP_WORKSPACE(), id.toString());
log.info("local appHome:{}", path);
return path;
}
@JsonIgnore
public String getRemoteAppHome() {
- String path = String.format("%s/%s",
Workspace.remote().APP_WORKSPACE(), id.toString());
+ String path = String.format("%s/%s",
Workspace.remote().SPARK_APP_WORKSPACE(), id.toString());
log.info("remote appHome:{}", path);
return path;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac..c2d026bad 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -147,6 +147,7 @@ public class EnvInitializer implements ApplicationRunner {
Arrays.asList(
workspace.APP_UPLOADS(),
workspace.APP_WORKSPACE(),
+ workspace.SPARK_APP_WORKSPACE(),
workspace.APP_BACKUPS(),
workspace.APP_SAVEPOINTS(),
workspace.APP_PYTHON(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
index d391017dc..8eed7e29d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.springframework.http.ResponseEntity;
@@ -28,6 +29,8 @@ public interface ProxyService {
ResponseEntity<?> proxyFlink(HttpServletRequest request, Application app)
throws Exception;
+ ResponseEntity<?> proxyYarn(HttpServletRequest request,
SparkApplicationLog log) throws Exception;
+
ResponseEntity<?> proxyYarn(HttpServletRequest request, ApplicationLog
log) throws Exception;
ResponseEntity<?> proxyHistory(HttpServletRequest request, ApplicationLog
log) throws Exception;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 7b098c7a3..0c7296a3c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -360,6 +360,7 @@ public class SparkApplicationActionServiceImpl
application.setAppId(response.sparkAppId());
}
applicationLog.setSparkAppId(response.sparkAppId());
+ applicationLog.setTrackUrl(response.trackingUrl());
application.setStartTime(new Date());
application.setEndTime(null);
@@ -480,7 +481,7 @@ public class SparkApplicationActionServiceImpl
}
}
- if (SparkExecutionMode.YARN_CLUSTER == executionModeEnum) {
+ if (SparkExecutionMode.isYarnMode(executionModeEnum)) {
switch (application.getApplicationType()) {
case STREAMPARK_SPARK:
sparkUserJar = String.format(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index f171594b7..c08830524 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -194,9 +194,9 @@ public class SparkApplicationManageServiceImpl
try {
application
.getFsOperator()
-
.delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
+
.delete(application.getWorkspace().SPARK_APP_WORKSPACE().concat("/").concat(appId.toString()));
// try to delete yarn-application, and leave no trouble.
- String path =
Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString());
+ String path =
Workspace.of(StorageType.HDFS).SPARK_APP_WORKSPACE().concat("/").concat(appId.toString());
if (HdfsOperator.exists(path)) {
HdfsOperator.delete(path);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index 7a8b89c8d..7edc1e38b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper;
@@ -140,6 +141,19 @@ public class ProxyServiceImpl implements ProxyService {
return proxyYarnRequest(request, url);
}
+ @Override
+ public ResponseEntity<?> proxyYarn(HttpServletRequest request,
SparkApplicationLog log) throws Exception {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ String yarnId = log.getSparkAppId();
+ if (StringUtils.isBlank(yarnId)) {
+ return builder.body("The yarn application id is null.");
+ }
+ String yarnURL = YarnUtils.getRMWebAppProxyURL();
+ String url = yarnURL + "/proxy/" + yarnId + "/";
+ url += getRequestURL(request, "/proxy/yarn/" + log.getId());
+ return proxyYarnRequest(request, url);
+ }
+
@Override
public ResponseEntity<?> proxyHistory(HttpServletRequest request,
ApplicationLog log) throws Exception {
ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
index eaec6ab0d..0009738af 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
@@ -143,7 +143,7 @@ public class SparkApplicationBackUpServiceImpl
if (!backUpPages.getRecords().isEmpty()) {
SparkApplicationBackUp backup = backUpPages.getRecords().get(0);
String path = backup.getPath();
- appParam.getFsOperator().move(path,
appParam.getWorkspace().APP_WORKSPACE());
+ appParam.getFsOperator().move(path,
appParam.getWorkspace().SPARK_APP_WORKSPACE());
super.removeById(backup.getId());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index 46d5f73fe..8e7c15476 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.watcher;
+import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AlertTemplate;
@@ -27,6 +28,8 @@ import org.apache.streampark.console.core.enums.StopFromEnum;
import org.apache.streampark.console.core.metrics.spark.Job;
import
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
@@ -34,6 +37,7 @@ import
org.apache.streampark.console.core.service.application.SparkApplicationMa
import org.apache.streampark.console.core.utils.AlertTemplateUtils;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hc.core5.util.Timeout;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -74,6 +78,12 @@ public class SparkAppHttpWatcher {
@Autowired
private SparkApplicationInfoService applicationInfoService;
+ @Autowired
+ private SparkApplicationLogService applicationLogService;
+
+ @Autowired
+ private SparkEnvService sparkEnvService;
+
@Autowired
private AlertService alertService;
@@ -199,6 +209,8 @@ public class SparkAppHttpWatcher {
} else {
try {
String state = yarnAppInfo.getApp().getState();
+ FinalApplicationStatus appFinalStatus =
+
HadoopUtils.toYarnFinalStatus(yarnAppInfo.getApp().getFinalStatus());
SparkAppStateEnum sparkAppStateEnum =
SparkAppStateEnum.of(state);
if (SparkAppStateEnum.OTHER == sparkAppStateEnum) {
return;
@@ -210,8 +222,15 @@ public class SparkAppHttpWatcher {
application.getAppId(),
sparkAppStateEnum);
application.setEndTime(new Date());
+ if (appFinalStatus.equals(FinalApplicationStatus.FAILED)) {
+ sparkAppStateEnum = SparkAppStateEnum.FAILED;
+ }
}
if (SparkAppStateEnum.RUNNING == sparkAppStateEnum) {
+ if (application.getStartTime() != null
+ && application.getStartTime().getTime() > 0) {
+ application.setDuration(System.currentTimeMillis() -
application.getStartTime().getTime());
+ }
SparkApplicationSummary summary;
try {
summary = httpStageAndTaskStatus(application);
@@ -305,7 +324,6 @@ public class SparkAppHttpWatcher {
String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
return yarnRestRequest(reqURL, YarnAppInfo.class);
}
-
private Job[] httpJobsStatus(SparkApplication application) throws
IOException {
String format = "proxy/%s/api/v1/applications/%s/jobs";
String reqURL = String.format(format, application.getAppId(),
application.getAppId());
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 474714271..770f9646c 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -45,6 +45,25 @@ insert into `t_flink_project` values (100000, 100000,
'streampark-quickstart', '
-- ----------------------------
insert into `t_flink_sql` values (100000, 100000,
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
null, null, 1, 1, now());
+-- ----------------------------
+-- Records of t_spark_app
+-- ----------------------------
+insert into `t_spark_app` (
+ `id`, `team_id`, `job_type`, `app_type`, `app_name`, `execution_mode`,
`resource_from`, `main_class`,
+ `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`,
`option_state`, `user_id`,
+ `description`, `tracking`, `release`, `build`, `create_time`,
`modify_time`, `tags`)
+values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+
+-- ----------------------------
+-- Records of t_spark_effective
+-- ----------------------------
+insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+
+-- ----------------------------
+-- Records of t_spark_sql
+-- ----------------------------
+insert into `t_spark_sql` values (100000, 100000,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
+
-- ----------------------------
-- Records of t_menu
-- ----------------------------
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
index 2e0b6ce8b..bceb50b40 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
@@ -124,6 +124,12 @@
</where>
</select>
+ <select id="selectApp"
resultType="org.apache.streampark.console.core.entity.SparkApplication"
parameterType="long">
+ select *
+ from t_spark_app
+ where id = #{id}
+ </select>
+
<select id="selectAppsByTeamId"
resultType="org.apache.streampark.console.core.entity.SparkApplication"
parameterType="java.lang.Long">
select
t.*,
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
b/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
index 81ba38d34..cf1b7e5d2 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
+++
b/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
@@ -102,9 +102,9 @@
async function handleAppSubmit(formValue: Recordable) {
let config = formValue.configOverride;
if (config != null && config !== undefined && config.trim() != '') {
- formValue.config = encryptByBase64(config);
+ config = encryptByBase64(config);
} else {
- formValue.config = null;
+ config = null;
}
if (formValue.jobType == JobTypeEnum.SQL) {
if (formValue.sparkSql == null || formValue.sparkSql.trim() === '') {
@@ -116,9 +116,15 @@
throw new Error(access);
}
}
- handleSQLMode(formValue);
+ handleSQLMode({
+ ...formValue,
+ config,
+ });
} else {
- handleCustomJobMode(formValue);
+ handleCustomJobMode({
+ ...formValue,
+ config,
+ });
}
}
/* send create request */
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
b/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
index 97413d76b..8ee123011 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
+++
b/streampark-console/streampark-console-webapp/src/views/spark/app/data/detail.data.ts
@@ -113,7 +113,8 @@ export const getBackupColumns = (): BasicColumn[] => [
export const getOptionLogColumns = (): BasicColumn[] => [
{ title: 'Operation Name', dataIndex: 'optionName' },
- { title: 'Cluster Id', dataIndex: 'yarnAppId' },
+ { title: 'Application Id', dataIndex: 'sparkAppId' },
+ { title: 'Tracking Url', dataIndex: 'trackUrl' },
{ title: 'Start Status', dataIndex: 'success' },
{ title: 'Option Time', dataIndex: 'optionTime' },
];
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
index 0b58da799..4037d6b7f 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
@@ -21,4 +21,5 @@ import org.apache.streampark.common.util.Implicits.JavaMap
case class SubmitResponse(
var sparkAppId: String,
+ trackingUrl: String,
sparkProperties: JavaMap[String, String])
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index a62ed677c..9287f72db 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -56,7 +56,8 @@ object SparkShimsProxy extends Logger {
"ch.qos.logback",
"org.xml",
"org.w3c",
- "org.apache.hadoop")
+ "org.apache.hadoop",
+ "org.apache.spark.launcher")
def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = {
val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index 15cf4d60b..031400825 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -19,11 +19,12 @@ package org.apache.streampark.spark.client.impl
import
org.apache.streampark.common.conf.ConfigKeys.{KEY_SPARK_YARN_AM_NODE_LABEL,
KEY_SPARK_YARN_EXECUTOR_NODE_LABEL, KEY_SPARK_YARN_QUEUE,
KEY_SPARK_YARN_QUEUE_LABEL, KEY_SPARK_YARN_QUEUE_NAME}
import org.apache.streampark.common.enums.SparkExecutionMode
-import org.apache.streampark.common.util.HadoopUtils
+import org.apache.streampark.common.util.{HadoopUtils, YarnUtils}
import org.apache.streampark.common.util.Implicits._
import org.apache.streampark.spark.client.`trait`.SparkClientTrait
import org.apache.streampark.spark.client.bean._
+import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
@@ -74,17 +75,23 @@ object YarnClient extends SparkClientTrait {
// 3) launch
Try(launch(launcher)) match {
case Success(handle: SparkAppHandle) =>
- logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${submitRequest.appName} is submit successful, " +
- s"appid: ${handle.getAppId}, " +
- s"state: ${handle.getState}")
- sparkHandles += handle.getAppId -> handle
- SubmitResponse(handle.getAppId, submitRequest.appProperties)
+ if (handle.getError.isPresent) {
+ logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${submitRequest.appName} submit failed.")
+ throw handle.getError.get()
+ } else {
+ logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${submitRequest.appName} submit successfully, " +
+ s"appid: ${handle.getAppId}, " +
+ s"state: ${handle.getState}")
+ sparkHandles += handle.getAppId -> handle
+ val trackingUrl =
YarnUtils.getYarnAppTrackingUrl(HadoopUtils.toApplicationId(handle.getAppId))
+ SubmitResponse(handle.getAppId, trackingUrl,
submitRequest.appProperties)
+ }
case Failure(e) => throw e
}
}
private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = {
- logger.info("[StreamPark][Spark][YarnClient] The spark job starting")
+ logger.info("[StreamPark][Spark][YarnClient] The spark job start
submitting")
val submitFinished: CountDownLatch = new CountDownLatch(1)
val sparkAppHandle = sparkLauncher.startApplication(new
SparkAppHandle.Listener() {
override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {}
@@ -92,14 +99,22 @@ object YarnClient extends SparkClientTrait {
if (handle.getAppId != null) {
logger.info(s"${handle.getAppId} stateChanged :
${handle.getState.toString}")
} else {
- logger.info("stateChanged :{}", handle.getState.toString)
- }
- if (SparkAppHandle.State.FAILED == handle.getState) {
- logger.error("Task run failure stateChanged :{}",
handle.getState.toString)
+ logger.info("stateChanged : {}", handle.getState.toString)
}
if (handle.getAppId != null && submitFinished.getCount != 0) {
+ // Task submission succeeded
submitFinished.countDown()
}
+ if (handle.getState.isFinal) {
+ if (StringUtils.isNotBlank(handle.getAppId) &&
sparkHandles.containsKey(handle.getAppId)) {
+ sparkHandles.remove(handle.getAppId)
+ }
+ if (submitFinished.getCount != 0) {
+ // Task submission failed
+ submitFinished.countDown()
+ }
+ logger.info("Task is end, final state : {}",
handle.getState.toString)
+ }
}
})
submitFinished.await()
@@ -107,7 +122,11 @@ object YarnClient extends SparkClientTrait {
}
private def prepareSparkLauncher(submitRequest: SubmitRequest) = {
- new SparkLauncher()
+ val env = new JavaHashMap[String, String]()
+ if (StringUtils.isNotBlank(submitRequest.hadoopUser)) {
+ env.put("HADOOP_USER_NAME", submitRequest.hadoopUser)
+ }
+ new SparkLauncher(env)
.setSparkHome(submitRequest.sparkVersion.sparkHome)
.setAppResource(submitRequest.userJarPath)
.setMainClass(submitRequest.appMain)