This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new af655696f [Feature] Support spark job status tracking (#3843)
af655696f is described below
commit af655696f3361946c792a32214908b3061675ba7
Author: lenoxzhao <[email protected]>
AuthorDate: Fri Jul 19 15:43:08 2024 +0800
[Feature] Support spark job status tracking (#3843)
* feat: add spark job state tracking
* feat: adjust base on updated code and add resource monitoring
* fix: fix e2e build failure
* feature: support spark parameters configuring
* feature: change cancel operation to stop operation
* fix: modify comment
* improve: remove flink related feature in SparkApplicationInfoService
---------
Co-authored-by: benjobs <[email protected]>
---
.../main/assembly/script/schema/mysql-schema.sql | 19 +
.../base/config/AsyncExecutorPoolConfig.java | 16 +
.../console/core/bean/AlertTemplate.java | 33 ++
.../controller/SparkApplicationController.java | 14 +-
.../console/core/entity/SparkApplication.java | 16 +-
.../console/core/entity/SparkApplicationLog.java | 51 +++
.../console/core/enums/SparkAppStateEnum.java | 74 ++--
.../console/core/enums/SparkOperationEnum.java | 27 +-
.../console/core/enums/SparkOptionStateEnum.java | 39 ++-
.../core/mapper/SparkApplicationLogMapper.java | 9 +-
.../streampark/console/core/metrics/spark/Job.java | 67 ++++
.../console/core/metrics/spark/SparkExecutor.java | 16 +-
.../core/service/SparkApplicationLogService.java | 44 +++
.../application/SparkApplicationActionService.java | 8 +-
.../application/SparkApplicationInfoService.java | 33 --
.../impl/SparkApplicationActionServiceImpl.java | 246 +++++--------
.../impl/SparkApplicationInfoServiceImpl.java | 137 +-------
.../service/impl/SparkAppBuildPipeServiceImpl.java | 20 +-
.../impl/SparkApplicationLogServiceImpl.java | 57 +++
.../console/core/watcher/SparkAppHttpWatcher.java | 389 +++++++++++++++++++++
.../streampark/spark/client/SparkClient.scala | 8 +-
.../{CancelRequest.scala => StopRequest.scala} | 3 +-
.../{CancelResponse.scala => StopResponse.scala} | 2 +-
.../spark/client/bean/SubmitResponse.scala | 1 +
.../SparkConfiguration.scala} | 11 +-
.../spark/client/proxy/SparkShimsProxy.scala | 10 +-
.../spark/client/SparkClientEndpoint.scala | 8 +-
.../spark/client/impl/YarnApplicationClient.scala | 102 +++---
.../spark/client/trait/SparkClientTrait.scala | 22 +-
29 files changed, 964 insertions(+), 518 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 41495bc71..69abc8634 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -632,4 +632,23 @@ create table `t_spark_app` (
index `inx_team` (`team_id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+
+-- ----------------------------
+-- table structure for t_spark_log
+-- ----------------------------
+drop table if exists `t_spark_log`;
+create table `t_spark_log` (
+ `id` bigint not null auto_increment,
+ `app_id` bigint default null,
+ `spark_app_id` varchar(64) collate utf8mb4_general_ci default null,
+ `track_url` varchar(255) collate utf8mb4_general_ci default null,
+ `success` tinyint default null,
+ `exception` text collate utf8mb4_general_ci,
+ `option_time` datetime default null,
+ `option_name` tinyint default null,
+ `user_id` bigint default null,
+ primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+
+
set foreign_key_checks = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
index 98d9f3f9c..5d51b3df1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
@@ -66,6 +66,22 @@ public class AsyncExecutorPoolConfig extends
AsyncConfigurerSupport {
ThreadUtils.threadFactory("flink-restapi-watching-executor-"));
}
+ /**
+ * Create a ThreadPoolTaskExecutor for SparkAppHttpWatcher.
+ *
+ * @return Executor
+ */
+ @Bean("sparkRestAPIWatchingExecutor")
+ public Executor sparkRestAPIWatchingExecutor() {
+ return new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors() * 5,
+ Runtime.getRuntime().availableProcessors() * 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1024),
+ ThreadUtils.threadFactory("spark-cluster-watching-executor-"));
+ }
+
/**
* Create a ThreadPoolTaskExecutor for FlinkClusterWatcher.
*
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index dee7241b3..ce24a1e9f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -19,12 +19,15 @@ package org.apache.streampark.console.core.bean;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
+import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import lombok.Data;
@@ -140,6 +143,25 @@ public class AlertTemplate implements Serializable {
.build();
}
+ public static AlertTemplate of(SparkApplication application,
SparkAppStateEnum appState) {
+ return new AlertTemplateBuilder()
+ .setDuration(application.getStartTime(), application.getEndTime())
+ .setJobName(application.getJobName())
+ .setLink(application.getSparkExecutionMode(),
application.getJobId())
+ .setStartTime(application.getStartTime())
+ .setEndTime(application.getEndTime())
+ .setRestart(application.isNeedRestartOnFailed(),
application.getRestartCount())
+ .setRestartIndex(application.getRestartCount())
+ .setTotalRestart(application.getRestartSize())
+ .setType(1)
+ .setTitle(
+ String.format(
+ "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(),
appState.name()))
+ .setSubject(
+ String.format("%s %s %s", ALERT_SUBJECT_PREFIX,
application.getJobName(), appState))
+ .setStatus(appState.name())
+ .build();
+ }
private static class AlertTemplateBuilder {
private final AlertTemplate alertTemplate = new AlertTemplate();
@@ -218,6 +240,17 @@ public class AlertTemplate implements Serializable {
return this;
}
+ public AlertTemplateBuilder setLink(SparkExecutionMode mode, String
appId) {
+ if (SparkExecutionMode.isYarnMode(mode)) {
+ String format = "%s/proxy/%s/";
+ String url = String.format(format,
YarnUtils.getRMWebAppURL(false), appId);
+ alertTemplate.setLink(url);
+ } else {
+ alertTemplate.setLink(null);
+ }
+ return this;
+ }
+
public AlertTemplateBuilder setCpFailureRateInterval(String
cpFailureRateInterval) {
alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
return this;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
index 4063710f6..2b413c760 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
@@ -24,12 +24,12 @@ import
org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.AppUpdated;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
-import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
@@ -70,7 +70,7 @@ public class SparkApplicationController {
private ApplicationBackUpService backUpService;
@Autowired
- private ApplicationLogService applicationLogService;
+ private SparkApplicationLogService applicationLogService;
@Autowired
private ResourceService resourceService;
@@ -153,8 +153,8 @@ public class SparkApplicationController {
@PostMapping(value = "cancel")
@RequiresPermissions("app:cancel")
- public RestResponse cancel(SparkApplication app) throws Exception {
- applicationActionService.cancel(app);
+ public RestResponse stop(SparkApplication app) throws Exception {
+ applicationActionService.stop(app);
return RestResponse.success();
}
@@ -209,8 +209,8 @@ public class SparkApplicationController {
}
@PostMapping("optionlog")
- public RestResponse optionlog(ApplicationLog applicationLog, RestRequest
request) {
- IPage<ApplicationLog> applicationList =
applicationLogService.getPage(applicationLog, request);
+ public RestResponse optionlog(SparkApplicationLog applicationLog,
RestRequest request) {
+ IPage<SparkApplicationLog> applicationList =
applicationLogService.getPage(applicationLog, request);
return RestResponse.success(applicationList);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 9c4be43e5..c07e2b160 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -32,6 +32,7 @@ import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.ResourceFromEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
@@ -285,19 +286,16 @@ public class SparkApplication extends BaseEntity {
}
/**
- * Determine if a FlinkAppState requires tracking.
+ * Determine if a SparkAppState requires tracking.
*
* @return 1: need to be tracked | 0: no need to be tracked.
*/
public Boolean shouldTracking() {
switch (getStateEnum()) {
case ADDED:
- case CREATED:
case FINISHED:
case FAILED:
- case CANCELED:
- case TERMINATED:
- case POS_TERMINATED:
+ case KILLED:
return false;
default:
return true;
@@ -312,15 +310,11 @@ public class SparkApplication extends BaseEntity {
public boolean isCanBeStart() {
switch (getStateEnum()) {
case ADDED:
- case CREATED:
case FAILED:
- case CANCELED:
case FINISHED:
case LOST:
- case TERMINATED:
case SUCCEEDED:
case KILLED:
- case POS_TERMINATED:
return true;
default:
return false;
@@ -338,8 +332,8 @@ public class SparkApplication extends BaseEntity {
}
@JsonIgnore
- public FlinkAppStateEnum getStateEnum() {
- return FlinkAppStateEnum.of(state);
+ public SparkAppStateEnum getStateEnum() {
+ return SparkAppStateEnum.of(state);
}
@JsonIgnore
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java
new file mode 100644
index 000000000..8c51c92ed
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Data
+@TableName("t_spark_log")
+@Slf4j
+public class SparkApplicationLog {
+
+ @TableId(type = IdType.AUTO)
+ private Long id;
+ /** appId */
+ private Long appId;
+ /** applicationId */
+ private String sparkAppId;
+ /** tracking url of current spark application */
+ private String trackUrl;
+ /** start status */
+ private Boolean success;
+ /** option name */
+ private Integer optionName;
+ /** option time */
+ private Date optionTime;
+ /** exception at the start */
+ private String exception;
+ /** The user who operates the application */
+ private Long userId;
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
index 822e32ba6..9e9847670 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
@@ -26,75 +26,47 @@ public enum SparkAppStateEnum {
/** Added new job to database. */
ADDED(0),
- /**
- * The job has been received by the Dispatcher, and is waiting for the job
manager to be created.
- */
- INITIALIZING(1),
+ /** (From Yarn)Application which was just created. */
+ NEW(1),
- /** Job is newly created, no task has started to run. */
- CREATED(2),
+ /** (From Yarn)Application which is being saved. */
+ NEW_SAVING(2),
/** Application which is currently running. */
STARTING(3),
- /** Application which is currently running. */
- RESTARTING(4),
+ /** (From Yarn)Application which has been submitted. */
+ SUBMITTED(4),
- /** Some tasks are scheduled or running, some may be pending, some may be
finished. */
- RUNNING(5),
+ /** (From Yarn)Application has been accepted by the scheduler. */
+ ACCEPTED(5),
/** The job has failed and is currently waiting for the cleanup to
complete. */
- FAILING(6),
-
- /** The job has failed with a non-recoverable task failure. */
- FAILED(7),
-
- /** Job is being cancelled. */
- CANCELLING(8),
-
- /** Job has been cancelled. */
- CANCELED(9),
+ RUNNING(6),
- /** All the job's tasks have successfully finished. */
- FINISHED(10),
+ /** (From Yarn)Application which finished successfully. */
+ FINISHED(7),
- /**
- * The job has been suspended which means that it has been stopped but not
been removed from a
- * potential HA job store.
- */
- SUSPENDED(11),
-
- /** The job is currently reconciling and waits for task execution report
to recover state. */
- RECONCILING(12),
+ /** (From Yarn)Application which failed. */
+ FAILED(8),
/** Loss of mapping. */
- LOST(13),
+ LOST(9),
/** Mapping. */
- MAPPING(14),
+ MAPPING(10),
/** Other statuses. */
- OTHER(15),
+ OTHER(11),
/** Has rollback. */
- REVOKED(16),
-
- /**
- * Lost track of Spark job temporarily. A complete loss of Spark job
tracking translates into LOST
- * state.
- */
- @Deprecated
- SILENT(17),
-
- /** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED.
*/
- TERMINATED(18),
+ REVOKED(12),
- /** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED.
*/
- @Deprecated
- POS_TERMINATED(19),
+ /** Spark job has being cancelling(killing) by streampark */
+ STOPPING(13),
/** Job SUCCEEDED on yarn. */
- SUCCEEDED(20),
+ SUCCEEDED(14),
/** Has killed in Yarn. */
KILLED(-9);
@@ -125,13 +97,11 @@ public enum SparkAppStateEnum {
public static boolean isEndState(Integer appState) {
SparkAppStateEnum sparkAppStateEnum = SparkAppStateEnum.of(appState);
- return SparkAppStateEnum.CANCELED == sparkAppStateEnum
- || SparkAppStateEnum.FAILED == sparkAppStateEnum
+ return SparkAppStateEnum.FAILED == sparkAppStateEnum
|| SparkAppStateEnum.KILLED == sparkAppStateEnum
|| SparkAppStateEnum.FINISHED == sparkAppStateEnum
|| SparkAppStateEnum.SUCCEEDED == sparkAppStateEnum
- || SparkAppStateEnum.LOST == sparkAppStateEnum
- || SparkAppStateEnum.TERMINATED == sparkAppStateEnum;
+ || SparkAppStateEnum.LOST == sparkAppStateEnum;
}
public static boolean isLost(Integer appState) {
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java
similarity index 63%
copy from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java
index d2582dd53..4d9ee1ae2 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java
@@ -15,14 +15,25 @@
* limitations under the License.
*/
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.enums;
-import javax.annotation.Nullable
+import lombok.Getter;
-import java.util.{Map => JavaMap}
+import java.util.Arrays;
-case class SubmitResponse(
- clusterId: String,
- sparkConfig: JavaMap[String, String],
- @Nullable jobId: String = "",
- @Nullable jobManagerUrl: String = "")
+/** Spark Operation type */
+@Getter
+public enum SparkOperationEnum {
+
+ RELEASE(0), START(1), STOP(2);
+
+ private final int value;
+
+ SparkOperationEnum(int value) {
+ this.value = value;
+ }
+
+ public static SparkOperationEnum of(Integer option) {
+ return Arrays.stream(values()).filter((x) -> x.value ==
option).findFirst().orElse(null);
+ }
+}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java
similarity index 52%
copy from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java
index 6b8e1bfa2..b3a3e03e8 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java
@@ -15,21 +15,32 @@
* limitations under the License.
*/
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.enums;
-import org.apache.streampark.common.conf.SparkVersion
-import org.apache.streampark.common.enums.SparkExecutionMode
+import lombok.Getter;
-import javax.annotation.Nullable
+import java.util.Arrays;
-import java.util.{Map => JavaMap}
+/** Option status */
+@Getter
+public enum SparkOptionStateEnum {
-case class CancelRequest(
- id: Long,
- sparkVersion: SparkVersion,
- executionMode: SparkExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- jobId: String,
- withDrain: Boolean,
- nativeFormat: Boolean)
+ /** Application which is currently action: none. */
+ NONE(0),
+ /** Application which is currently action: releasing. */
+ RELEASING(1),
+ /** Application which is currently action: starting. */
+ STARTING(2),
+ /** Application which is currently action: stopping. */
+ STOPPING(3);
+
+ private final int value;
+
+ SparkOptionStateEnum(int value) {
+ this.value = value;
+ }
+
+ public static SparkOptionStateEnum of(Integer state) {
+ return Arrays.stream(values()).filter((x) -> x.value ==
state).findFirst().orElse(null);
+ }
+}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java
similarity index 75%
copy from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java
index d293947da..c9e77768a 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java
@@ -15,6 +15,11 @@
* limitations under the License.
*/
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.mapper;
-case class CancelResponse(savePointDir: String)
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface SparkApplicationLogMapper extends
BaseMapper<SparkApplicationLog> {
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java
new file mode 100644
index 000000000..be28f6631
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.metrics.spark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+import scala.collection.Map;
+
+@Data
+public class Job implements Serializable {
+
+ @JsonProperty("jobId")
+ private Long id;
+
+ private String name;
+
+ private String submissionTime;
+
+ private String completionTime;
+
+ private List<Long> stageIds;
+
+ private String status;
+
+ private Integer numTasks;
+
+ private Integer numActiveTasks;
+
+ private Integer numCompletedTasks;
+
+ private Integer numSkippedTasks;
+
+ private Integer numFailedTasks;
+
+ private Integer numKilledTasks;
+
+ private Integer numCompletedIndices;
+
+ private Integer numActiveStages;
+
+ private Integer numCompletedStages;
+
+ private Integer numSkippedStages;
+
+ private Integer numFailedStages;
+
+ private Map<String, Object> killedTasksSummary;
+}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java
similarity index 75%
copy from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java
index d293947da..12e0c5e22 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java
@@ -15,6 +15,18 @@
* limitations under the License.
*/
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.metrics.spark;
-case class CancelResponse(savePointDir: String)
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SparkExecutor implements Serializable {
+
+ private Long memoryUsed;
+
+ private Long maxMemory;
+
+ private Long totalCores;
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
new file mode 100644
index 000000000..6edee09ac
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/** This interface is used to record spark application operation logs */
+public interface SparkApplicationLogService extends
IService<SparkApplicationLog> {
+
+ /**
+ * Retrieves a page of {@link SparkApplicationLog} objects based on the
provided parameters.
+ *
+ * @param sparkApplicationLog The {@link SparkApplicationLog} object
containing the search criteria.
+ * @param request The {@link RestRequest} object used for pagination and
sorting.
+ * @return An {@link IPage} containing the retrieved {@link
SparkApplicationLog} objects.
+ */
+ IPage<SparkApplicationLog> getPage(SparkApplicationLog
sparkApplicationLog, RestRequest request);
+
+ /**
+ * remove application log by application id
+ *
+ * @param appId The id of the application to be removed
+ */
+ void removeByAppId(Long appId);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
index 3a1c086ca..682717cbe 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
@@ -54,12 +54,12 @@ public interface SparkApplicationActionService extends
IService<SparkApplication
void revoke(Long appId) throws ApplicationException;
/**
- * Cancels the given application. Throws an exception if cancellation
fails.
+ * Stop the given application. Throws an exception if stop fails.
*
- * @param appParam the application to be canceled
- * @throws Exception if cancellation fails
+ * @param appParam the application to be stopped
+ * @throws Exception if stop fails
*/
- void cancel(SparkApplication appParam) throws Exception;
+ void stop(SparkApplication appParam) throws Exception;
/**
* Forces the given application to stop.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
index 81b9091e7..4100826c5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
@@ -87,39 +87,6 @@ public interface SparkApplicationInfoService extends
IService<SparkApplication>
*/
boolean existsBySparkEnvId(Long sparkEnvId);
- /**
- * Checks if a job is running for a given cluster ID.
- *
- * @param clusterId The ID of the cluster.
- * @return true if a job is running for the given cluster ID; otherwise,
false.
- */
- boolean existsRunningByClusterId(Long clusterId);
-
- /**
- * Checks if there is a job that is associated with the given cluster ID.
- *
- * @param clusterId The ID of the cluster.
- * @return True if a job exists for the given cluster ID, false otherwise.
- */
- boolean existsByClusterId(Long clusterId);
-
- /**
- * Counts the number of items associated with the given cluster ID.
- *
- * @param clusterId The ID of the cluster.
- * @return The number of items associated with the given cluster ID.
- */
- Integer countByClusterId(Long clusterId);
-
- /**
- * Counts the number of items associated with the given cluster ID and
database type.
- *
- * @param clusterId The ID of the cluster.
- * @param dbType The type of the database.
- * @return The number of items associated with the given cluster ID and
database type.
- */
- Integer countAffectedByClusterId(Long clusterId, String dbType);
-
/**
* Gets the YARN name for the given application.
*
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 46e447c75..4ddb22c51 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -21,9 +21,7 @@ import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
-import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
@@ -31,52 +29,46 @@ import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.ApplicationConfig;
-import org.apache.streampark.console.core.entity.ApplicationLog;
-import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
-import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.entity.SparkEnv;
-import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
-import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
-import org.apache.streampark.console.core.enums.OperationEnum;
-import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkOperationEnum;
+import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.ServiceHelper;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.VariableService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
-import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
+import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.streampark.spark.client.SparkClient;
-import org.apache.streampark.spark.client.bean.CancelRequest;
-import org.apache.streampark.spark.client.bean.CancelResponse;
+import org.apache.streampark.spark.client.bean.StopRequest;
+import org.apache.streampark.spark.client.bean.StopResponse;
import org.apache.streampark.spark.client.bean.SubmitRequest;
import org.apache.streampark.spark.client.bean.SubmitResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -91,7 +83,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.io.File;
-import java.net.URI;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@@ -104,6 +95,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import static org.apache.hadoop.service.Service.STATE.STARTED;
+
@Slf4j
@Service
public class SparkApplicationActionServiceImpl
@@ -123,7 +116,7 @@ public class SparkApplicationActionServiceImpl
private ApplicationConfigService configService;
@Autowired
- private ApplicationLogService applicationLogService;
+ private SparkApplicationLogService applicationLogService;
@Autowired
private SparkEnvService sparkEnvService;
@@ -137,21 +130,15 @@ public class SparkApplicationActionServiceImpl
@Autowired
private AppBuildPipeService appBuildPipeService;
- @Autowired
- private FlinkClusterService flinkClusterService;
-
@Autowired
private VariableService variableService;
@Autowired
private ResourceService resourceService;
- @Autowired
- private FlinkClusterWatcher flinkClusterWatcher;
-
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap
= new ConcurrentHashMap<>();
- private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap
= new ConcurrentHashMap<>();
+ private final Map<Long, CompletableFuture<StopResponse>> stopFutureMap =
new ConcurrentHashMap<>();
@Override
public void revoke(Long appId) throws ApplicationException {
@@ -174,104 +161,75 @@ public class SparkApplicationActionServiceImpl
updateWrapper.set(SparkApplication::getRelease,
ReleaseStateEnum.NEED_RELEASE.get());
}
if (!application.isRunning()) {
- updateWrapper.set(SparkApplication::getState,
FlinkAppStateEnum.REVOKED.getValue());
+ updateWrapper.set(SparkApplication::getState,
SparkAppStateEnum.REVOKED.getValue());
}
baseMapper.update(null, updateWrapper);
}
@Override
public void restart(SparkApplication appParam) throws Exception {
- this.cancel(appParam);
+ this.stop(appParam);
this.start(appParam, false);
}
@Override
public void forcedStop(Long id) {
CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(id);
- CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(id);
+ CompletableFuture<StopResponse> stopFuture = stopFutureMap.remove(id);
SparkApplication application = this.baseMapper.selectApp(id);
if (startFuture != null) {
startFuture.cancel(true);
}
- if (cancelFuture != null) {
- cancelFuture.cancel(true);
+ if (stopFuture != null) {
+ stopFuture.cancel(true);
}
- if (startFuture == null && cancelFuture == null) {
+ if (startFuture == null && stopFuture == null) {
this.doStopped(id);
}
}
@Override
- public void cancel(SparkApplication appParam) throws Exception {
- FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionStateEnum.CANCELLING);
+ public void stop(SparkApplication appParam) throws Exception {
+ SparkAppHttpWatcher.setOptionState(appParam.getId(),
SparkOptionStateEnum.STOPPING);
SparkApplication application = getById(appParam.getId());
- application.setState(FlinkAppStateEnum.CANCELLING.getValue());
+ application.setState(SparkAppStateEnum.STOPPING.getValue());
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
+ SparkApplicationLog applicationLog = new SparkApplicationLog();
+ applicationLog.setOptionName(SparkOperationEnum.STOP.getValue());
applicationLog.setAppId(application.getId());
- applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+ applicationLog.setTrackUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
- applicationLog.setYarnAppId(application.getClusterId());
+ applicationLog.setSparkAppId(application.getJobId());
applicationLog.setUserId(serviceHelper.getUserId());
-
- if (appParam.getSavePointed()) {
- FlinkAppHttpWatcher.addSavepoint(application.getId());
-
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
- } else {
- application.setOptionState(OptionStateEnum.CANCELLING.getValue());
- }
-
application.setOptionTime(new Date());
this.baseMapper.updateById(application);
Long userId = serviceHelper.getUserId();
if (!application.getUserId().equals(userId)) {
- FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
+ SparkAppHttpWatcher.addCanceledApp(application.getId(), userId);
}
SparkEnv sparkEnv =
sparkEnvService.getById(application.getVersionId());
- String clusterId = null;
- if (SparkExecutionMode.isYarnMode(application.getExecutionMode())) {
- clusterId = application.getAppId();
- }
-
Map<String, Object> properties = new HashMap<>();
- if
(SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
- FlinkCluster cluster =
flinkClusterService.getById(application.getSparkClusterId());
- ApiAlertException.throwIfNull(
- cluster,
- String.format(
- "The clusterId=%s cannot be find, maybe the clusterId is
wrong or "
- + "the cluster has been deleted. Please contact the
Admin.",
- application.getSparkClusterId()));
- URI activeAddress = cluster.getRemoteURI();
- properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
- properties.put(RestOptions.PORT.key(), activeAddress.getPort());
- }
-
- CancelRequest cancelRequest = new CancelRequest(
- application.getId(),
- sparkEnv.getSparkVersion(),
- SparkExecutionMode.of(application.getExecutionMode()),
- properties,
- clusterId,
- application.getJobId(),
- appParam.getDrain(),
- appParam.getNativeFormat());
-
- final Date triggerTime = new Date();
- CompletableFuture<CancelResponse> cancelFuture = CompletableFuture
- .supplyAsync(() -> SparkClient.cancel(cancelRequest),
executorService);
-
- cancelFutureMap.put(application.getId(), cancelFuture);
-
- cancelFuture.whenComplete(
+ StopRequest stopRequest =
+ new StopRequest(
+ application.getId(),
+ sparkEnv.getSparkVersion(),
+ SparkExecutionMode.of(application.getExecutionMode()),
+ properties,
+ application.getJobId(),
+ appParam.getDrain(),
+ appParam.getNativeFormat());
+
+ CompletableFuture<StopResponse> stopFuture =
+ CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest),
executorService);
+
+ stopFutureMap.put(application.getId(), stopFuture);
+ stopFuture.whenComplete(
(cancelResponse, throwable) -> {
- cancelFutureMap.remove(application.getId());
-
+ stopFutureMap.remove(application.getId());
if (throwable != null) {
String exception =
ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
@@ -281,31 +239,17 @@ public class SparkApplicationActionServiceImpl
if (throwable instanceof CancellationException) {
doStopped(application.getId());
} else {
- log.error("stop flink job failed.", throwable);
-
application.setOptionState(OptionStateEnum.NONE.getValue());
-
application.setState(FlinkAppStateEnum.FAILED.getValue());
+ log.error("stop spark job failed.", throwable);
+
application.setOptionState(SparkOptionStateEnum.NONE.getValue());
+
application.setState(SparkAppStateEnum.FAILED.getValue());
updateById(application);
-
- FlinkAppHttpWatcher.unWatching(application.getId());
+ SparkAppHttpWatcher.unWatching(application.getId());
}
return;
}
-
applicationLog.setSuccess(true);
// save log...
applicationLogService.save(applicationLog);
-
- if (cancelResponse != null && cancelResponse.savePointDir() !=
null) {
- String savePointDir = cancelResponse.savePointDir();
- log.info("savePoint path: {}", savePointDir);
- SavePoint savePoint = new SavePoint();
- savePoint.setPath(savePointDir);
- savePoint.setAppId(application.getId());
- savePoint.setLatest(true);
- savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
- savePoint.setCreateTime(new Date());
- savePoint.setTriggerTime(triggerTime);
- }
});
}
@@ -317,24 +261,16 @@ public class SparkApplicationActionServiceImpl
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot
be started repeatedly.");
- if
(SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
- checkBeforeStart(application);
- }
+ SparkEnv sparkEnv =
sparkEnvService.getByIdOrDefault(application.getVersionId());
+ ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found
spark version");
if
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
-
- ApiAlertException.throwIfTrue(
-
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
- "[StreamPark] The same task name is already running in the
yarn queue");
+ checkYarnBeforeStart(application);
}
AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
AssertUtils.notNull(buildPipeline);
- SparkEnv sparkEnv =
sparkEnvService.getByIdOrDefault(application.getVersionId());
-
- ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found
flink version");
-
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -342,7 +278,6 @@ public class SparkApplicationActionServiceImpl
if (!application.isNeedRestartOnFailed()) {
return;
}
- appParam.setSavePointed(true);
application.setRestartCount(application.getRestartCount() + 1);
}
@@ -350,8 +285,8 @@ public class SparkApplicationActionServiceImpl
starting(application);
String jobId = new JobID().toHexString();
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(OperationEnum.START.getValue());
+ SparkApplicationLog applicationLog = new SparkApplicationLog();
+ applicationLog.setOptionName(SparkOperationEnum.START.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setOptionTime(new Date());
applicationLog.setUserId(serviceHelper.getUserId());
@@ -376,6 +311,7 @@ public class SparkApplicationActionServiceImpl
if (SparkExecutionMode.YARN_CLUSTER ==
application.getSparkExecutionMode()
|| SparkExecutionMode.YARN_CLIENT ==
application.getSparkExecutionMode()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
+ application.setJobManagerUrl(YarnUtils.getRMWebAppURL(true));
}
// Get the args after placeholder replacement
@@ -401,7 +337,6 @@ public class SparkApplicationActionServiceImpl
.supplyAsync(() -> SparkClient.submit(submitRequest),
executorService);
startFutureMap.put(application.getId(), future);
-
future.whenComplete(
(response, throwable) -> {
// 1) remove Future
@@ -417,16 +352,17 @@ public class SparkApplicationActionServiceImpl
doStopped(application.getId());
} else {
SparkApplication app = getById(appParam.getId());
- app.setState(FlinkAppStateEnum.FAILED.getValue());
- app.setOptionState(OptionStateEnum.NONE.getValue());
+ app.setState(SparkAppStateEnum.FAILED.getValue());
+
app.setOptionState(SparkOptionStateEnum.NONE.getValue());
updateById(app);
- FlinkAppHttpWatcher.unWatching(appParam.getId());
+ SparkAppHttpWatcher.unWatching(appParam.getId());
}
return;
}
// 3) success
applicationLog.setSuccess(true);
+ // TODO:修改为spark对应的参数
if (response.sparkConfig() != null) {
String jmMemory =
response.sparkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
@@ -438,21 +374,21 @@ public class SparkApplicationActionServiceImpl
}
}
application.setAppId(response.clusterId());
- if (StringUtils.isNoneEmpty(response.jobId())) {
- application.setJobId(response.jobId());
+ if (StringUtils.isNoneEmpty(response.sparkAppId())) {
+ application.setJobId(response.sparkAppId());
}
if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
application.setJobManagerUrl(response.jobManagerUrl());
- applicationLog.setJobManagerUrl(response.jobManagerUrl());
+ applicationLog.setTrackUrl(response.jobManagerUrl());
}
- applicationLog.setYarnAppId(response.clusterId());
+ applicationLog.setSparkAppId(response.sparkAppId());
application.setStartTime(new Date());
application.setEndTime(null);
// if start completed, will be added task to tracking queue
- FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionStateEnum.STARTING);
- // FlinkAppHttpWatcher.doWatching(application);
+ SparkAppHttpWatcher.setOptionState(appParam.getId(),
SparkOptionStateEnum.STARTING);
+ SparkAppHttpWatcher.doWatching(application);
// update app
updateById(application);
@@ -487,7 +423,7 @@ public class SparkApplicationActionServiceImpl
}
private void starting(SparkApplication application) {
- application.setState(FlinkAppStateEnum.STARTING.getValue());
+ application.setState(SparkAppStateEnum.STARTING.getValue());
application.setOptionTime(new Date());
updateById(application);
}
@@ -508,8 +444,7 @@ public class SparkApplicationActionServiceImpl
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(flinkSql);
// 1) dist_userJar
- // todo
- String sqlDistJar = serviceHelper.getFlinkSqlClientJar(null);
+ String sqlDistJar =
serviceHelper.getSparkSqlClientJar(sparkEnv);
// 2) appConfig
appConf = applicationConfig == null
? null
@@ -598,18 +533,7 @@ public class SparkApplicationActionServiceImpl
private Map<String, Object> getProperties(SparkApplication application) {
Map<String, Object> properties = new
HashMap<>(application.getOptionMap());
- if
(SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
- FlinkCluster cluster =
flinkClusterService.getById(application.getSparkClusterId());
- ApiAlertException.throwIfNull(
- cluster,
- String.format(
- "The clusterId=%s can't be find, maybe the clusterId is
wrong or "
- + "the cluster has been deleted. Please contact the
Admin.",
- application.getSparkClusterId()));
- URI activeAddress = cluster.getRemoteURI();
- properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
- properties.put(RestOptions.PORT.key(), activeAddress.getPort());
- } else if
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
+ if
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
String yarnQueue = (String)
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
String yarnLabelExpr = (String)
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
Optional.ofNullable(yarnQueue)
@@ -618,29 +542,19 @@ public class SparkApplicationActionServiceImpl
.ifPresent(yLabel ->
properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel));
}
- if (application.getAllowNonRestored()) {
-
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(),
true);
- }
-
Map<String, String> dynamicProperties = PropertiesUtils
.extractDynamicPropertiesAsJava(application.getDynamicProperties());
properties.putAll(dynamicProperties);
- ResolveOrder resolveOrder =
ResolveOrder.of(application.getResolveOrder());
- if (resolveOrder != null) {
- properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(),
resolveOrder.getName());
- }
-
return properties;
}
private void doStopped(Long id) {
SparkApplication application = getById(id);
- application.setOptionState(OptionStateEnum.NONE.getValue());
- application.setState(FlinkAppStateEnum.CANCELED.getValue());
+ application.setOptionState(SparkOptionStateEnum.NONE.getValue());
+ application.setState(SparkAppStateEnum.KILLED.getValue());
application.setOptionTime(new Date());
updateById(application);
- // re-tracking flink job on kubernetes and logging exception
- FlinkAppHttpWatcher.unWatching(application.getId());
+ SparkAppHttpWatcher.unWatching(application.getId());
// kill application
if
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
try {
@@ -655,18 +569,14 @@ public class SparkApplicationActionServiceImpl
}
}
- /* check flink cluster before job start job */
- private void checkBeforeStart(SparkApplication application) {
- SparkEnv sparkEnv = sparkEnvService.getByAppId(application.getId());
- ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found
flink version");
-
+ /* check yarn cluster before job start job */
+ private void checkYarnBeforeStart(SparkApplication application) {
+ STATE yarnState = HadoopUtils.yarnClient().getServiceState();
ApiAlertException.throwIfFalse(
- flinkClusterService.existsByFlinkEnvId(sparkEnv.getId()),
- "[StreamPark] The flink cluster don't exist, please check it");
-
- FlinkCluster flinkCluster =
flinkClusterService.getById(application.getSparkClusterId());
- ApiAlertException.throwIfFalse(
- flinkClusterWatcher.getClusterState(flinkCluster) ==
ClusterState.RUNNING,
- "[StreamPark] The flink cluster not running, please start it");
+ yarnState == STARTED,
+ "[StreamPark] The yarn cluster service state is " +
yarnState.name() + ", please check it");
+ ApiAlertException.throwIfTrue(
+
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
+ "[StreamPark] The same task name is already running in the yarn
queue");
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
index 58a5bb125..4d4aba24e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
@@ -26,27 +26,19 @@ import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.exception.ApplicationException;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
-import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.runner.EnvInitializer;
-import org.apache.streampark.console.core.service.FlinkClusterService;
-import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SparkEnvService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
-import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
+import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
import org.apache.streampark.flink.core.conf.ParameterCli;
-import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -96,94 +88,16 @@ public class SparkApplicationInfoServiceImpl
@Autowired
private SparkEnvService sparkEnvService;
- @Autowired
- private SavePointService savePointService;
-
@Autowired
private EnvInitializer envInitializer;
- @Autowired
- private FlinkK8sWatcher k8SFlinkTrackMonitor;
-
- @Autowired
- private FlinkClusterService flinkClusterService;
-
- @Autowired
- private FlinkClusterWatcher flinkClusterWatcher;
-
@Override
public Map<String, Serializable> getDashboardDataMap(Long teamId) {
- JobsOverview.Task overview = new JobsOverview.Task();
- Integer totalJmMemory = 0;
- Integer totalTmMemory = 0;
- Integer totalTm = 0;
- Integer totalSlot = 0;
- Integer availableSlot = 0;
- Integer runningJob = 0;
-
- // stat metrics from other than kubernetes mode
- for (Application app : FlinkAppHttpWatcher.getWatchingApps()) {
- if (!teamId.equals(app.getTeamId())) {
- continue;
- }
- if (app.getJmMemory() != null) {
- totalJmMemory += app.getJmMemory();
- }
- if (app.getTmMemory() != null) {
- totalTmMemory += app.getTmMemory() * (app.getTotalTM() == null
? 1 : app.getTotalTM());
- }
- if (app.getTotalTM() != null) {
- totalTm += app.getTotalTM();
- }
- if (app.getTotalSlot() != null) {
- totalSlot += app.getTotalSlot();
- }
- if (app.getAvailableSlot() != null) {
- availableSlot += app.getAvailableSlot();
- }
- if (app.getState() == FlinkAppStateEnum.RUNNING.getValue()) {
- runningJob++;
- }
- JobsOverview.Task task = app.getOverview();
- if (task != null) {
- overview.setTotal(overview.getTotal() + task.getTotal());
- overview.setCreated(overview.getCreated() + task.getCreated());
- overview.setScheduled(overview.getScheduled() +
task.getScheduled());
- overview.setDeploying(overview.getDeploying() +
task.getDeploying());
- overview.setRunning(overview.getRunning() + task.getRunning());
- overview.setFinished(overview.getFinished() +
task.getFinished());
- overview.setCanceling(overview.getCanceling() +
task.getCanceling());
- overview.setCanceled(overview.getCanceled() +
task.getCanceled());
- overview.setFailed(overview.getFailed() + task.getFailed());
- overview.setReconciling(overview.getReconciling() +
task.getReconciling());
- }
- }
-
- // merge metrics from flink kubernetes cluster
- FlinkMetricCV k8sMetric =
k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
- if (k8sMetric != null) {
- totalJmMemory += k8sMetric.totalJmMemory();
- totalTmMemory += k8sMetric.totalTmMemory();
- totalTm += k8sMetric.totalTm();
- totalSlot += k8sMetric.totalSlot();
- availableSlot += k8sMetric.availableSlot();
- runningJob += k8sMetric.runningJob();
- overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
- overview.setRunning(overview.getRunning() +
k8sMetric.runningJob());
- overview.setFinished(overview.getFinished() +
k8sMetric.finishedJob());
- overview.setCanceled(overview.getCanceled() +
k8sMetric.cancelledJob());
- overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
- }
// result json
Map<String, Serializable> dashboardDataMap = new HashMap<>(8);
- dashboardDataMap.put("task", overview);
- dashboardDataMap.put("jmMemory", totalJmMemory);
- dashboardDataMap.put("tmMemory", totalTmMemory);
- dashboardDataMap.put("totalTM", totalTm);
- dashboardDataMap.put("availableSlot", availableSlot);
- dashboardDataMap.put("totalSlot", totalSlot);
- dashboardDataMap.put("runningJob", runningJob);
+ // TODO: Tasks running metrics for presentation
+ // dashboardDataMap.put("metrics key", "metrics value");
return dashboardDataMap;
}
@@ -203,14 +117,6 @@ public class SparkApplicationInfoServiceImpl
}
envInitializer.checkSparkEnv(application.getStorageType(),
sparkEnv);
envInitializer.storageInitialize(application.getStorageType());
-
- if (SparkExecutionMode.REMOTE ==
application.getSparkExecutionMode()) {
- FlinkCluster flinkCluster =
flinkClusterService.getById(application.getSparkClusterId());
- boolean conned =
flinkClusterWatcher.verifyClusterConnection(flinkCluster);
- if (!conned) {
- throw new ApiAlertException("the target cluster is
unavailable, please check!");
- }
- }
return true;
} catch (Exception e) {
log.error(ExceptionUtils.stringifyException(e));
@@ -221,10 +127,10 @@ public class SparkApplicationInfoServiceImpl
@Override
public boolean checkAlter(SparkApplication appParam) {
Long appId = appParam.getId();
- if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()) {
+ if (SparkAppStateEnum.KILLED != appParam.getStateEnum()) {
return false;
}
- long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
+ long cancelUserId = SparkAppHttpWatcher.getCanceledJobUserId(appId);
long appUserId = appParam.getUserId();
return cancelUserId != -1 && cancelUserId != appUserId;
}
@@ -241,37 +147,6 @@ public class SparkApplicationInfoServiceImpl
new
LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getUserId, userId));
}
- @Override
- public boolean existsRunningByClusterId(Long clusterId) {
- return baseMapper.existsRunningJobByClusterId(clusterId)
- || FlinkAppHttpWatcher.getWatchingApps().stream()
- .anyMatch(
- application ->
clusterId.equals(application.getFlinkClusterId())
- && FlinkAppStateEnum.RUNNING == application
- .getStateEnum());
- }
-
- @Override
- public boolean existsByClusterId(Long clusterId) {
- return baseMapper.exists(
- new LambdaQueryWrapper<SparkApplication>()
- .eq(SparkApplication::getSparkClusterId, clusterId));
- }
-
- @Override
- public Integer countByClusterId(Long clusterId) {
- return baseMapper
- .selectCount(
- new LambdaQueryWrapper<SparkApplication>()
- .eq(SparkApplication::getSparkClusterId, clusterId))
- .intValue();
- }
-
- @Override
- public Integer countAffectedByClusterId(Long clusterId, String dbType) {
- return baseMapper.countAffectedByClusterId(clusterId, dbType);
- }
-
@Override
public boolean existsBySparkEnvId(Long sparkEnvId) {
return baseMapper.exists(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
index 94b9abf8a..36d63d037 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
@@ -32,11 +32,11 @@ import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.ApplicationConfig;
-import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Message;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
import org.apache.streampark.console.core.enums.NoticeTypeEnum;
@@ -46,17 +46,17 @@ import
org.apache.streampark.console.core.enums.ResourceTypeEnum;
import
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.SparkEnvService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
@@ -129,10 +129,10 @@ public class SparkAppBuildPipeServiceImpl
private SparkApplicationInfoService applicationInfoService;
@Autowired
- private ApplicationLogService applicationLogService;
+ private SparkApplicationLogService applicationLogService;
@Autowired
- private FlinkAppHttpWatcher flinkAppHttpWatcher;
+ private SparkAppHttpWatcher sparkAppHttpWatcher;
@Autowired
private ApplicationConfigService applicationConfigService;
@@ -157,7 +157,7 @@ public class SparkAppBuildPipeServiceImpl
checkBuildEnv(appId, forceBuild);
SparkApplication app = applicationManageService.getById(appId);
- ApplicationLog applicationLog = new ApplicationLog();
+ SparkApplicationLog applicationLog = new SparkApplicationLog();
applicationLog.setOptionName(RELEASE.getValue());
applicationLog.setAppId(app.getId());
applicationLog.setOptionTime(new Date());
@@ -202,8 +202,8 @@ public class SparkAppBuildPipeServiceImpl
app.setRelease(ReleaseStateEnum.RELEASING.get());
applicationManageService.updateRelease(app);
- if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
- flinkAppHttpWatcher.init();
+ if (sparkAppHttpWatcher.isWatchingApp(app.getId())) {
+ sparkAppHttpWatcher.init();
}
// 1) checkEnv
@@ -331,8 +331,8 @@ public class SparkAppBuildPipeServiceImpl
}
applicationManageService.updateRelease(app);
applicationLogService.save(applicationLog);
- if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
- flinkAppHttpWatcher.init();
+ if (sparkAppHttpWatcher.isWatchingApp(app.getId())) {
+ sparkAppHttpWatcher.init();
}
}
});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
new file mode 100644
index 000000000..cb6e338af
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+import org.apache.streampark.console.core.mapper.SparkApplicationLogMapper;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
+public class SparkApplicationLogServiceImpl extends
ServiceImpl<SparkApplicationLogMapper, SparkApplicationLog>
+ implements
+ SparkApplicationLogService {
+
+ @Override
+ public IPage<SparkApplicationLog> getPage(SparkApplicationLog
sparkApplicationLog, RestRequest request) {
+ request.setSortField("option_time");
+ Page<SparkApplicationLog> page = MybatisPager.getPage(request);
+ LambdaQueryWrapper<SparkApplicationLog> queryWrapper = new
LambdaQueryWrapper<SparkApplicationLog>()
+ .eq(SparkApplicationLog::getAppId, sparkApplicationLog.getAppId());
+ return this.page(page, queryWrapper);
+ }
+
+ @Override
+ public void removeByAppId(Long appId) {
+ LambdaQueryWrapper<SparkApplicationLog> queryWrapper = new
LambdaQueryWrapper<SparkApplicationLog>()
+ .eq(SparkApplicationLog::getAppId, appId);
+ this.remove(queryWrapper);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
new file mode 100644
index 000000000..c7e447cd2
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.watcher;
+
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.base.util.Tuple2;
+import org.apache.streampark.console.base.util.Tuple3;
+import org.apache.streampark.console.core.bean.AlertTemplate;
+import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
+import org.apache.streampark.console.core.enums.StopFromEnum;
+import org.apache.streampark.console.core.metrics.spark.Job;
+import org.apache.streampark.console.core.metrics.spark.SparkExecutor;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.alert.AlertService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.hc.core5.util.Timeout;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Component
+public class SparkAppHttpWatcher {
+
+ @Autowired
+ private SparkApplicationManageService applicationManageService;
+
+ @Autowired
+ private SparkApplicationActionService applicationActionService;
+
+ @Autowired
+ private SparkApplicationInfoService applicationInfoService;
+
+ @Autowired
+ private AlertService alertService;
+
+ @Qualifier("sparkRestAPIWatchingExecutor")
+ @Autowired
+ private Executor executorService;
+
+ // track interval every 5 seconds
+ public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+
+ // option interval within 10 seconds
+ private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
+
+ private static final Timeout HTTP_TIMEOUT = Timeout.ofSeconds(5);
+
+ /**
+ * Record the status of the first tracking task, because after the task is
started, the overview
+ * of the task will be obtained during the first tracking
+ */
+ private static final Cache<Long, Byte> STARTING_CACHE =
+ Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+
+ /** tracking task list */
+ private static final Map<Long, SparkApplication> WATCHING_APPS = new
ConcurrentHashMap<>(0);
+
+ /**
+ * <pre>
+ * StopFrom: Recording spark application stopped by streampark or stopped
by other actions
+ * </pre>
+ */
+ private static final Map<Long, StopFromEnum> STOP_FROM_MAP = new
ConcurrentHashMap<>(0);
+
+ /**
+ * Task canceled tracking list, record who cancelled the tracking task
Map<applicationId,userId>
+ */
+ private static final Map<Long, Long> CANCELLED_JOB_MAP = new
ConcurrentHashMap<>(0);
+
+ private static final Map<Long, SparkOptionStateEnum> OPTIONING = new
ConcurrentHashMap<>(0);
+
+ private Long lastWatchTime = 0L;
+
+ private Long lastOptionTime = 0L;
+
+ private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
+
+ @PostConstruct
+ public void init() {
+ WATCHING_APPS.clear();
+ List<SparkApplication> applications =
+ applicationManageService.list(
+ new LambdaQueryWrapper<SparkApplication>()
+ .eq(SparkApplication::getTracking, 1)
+ .ne(SparkApplication::getState,
SparkAppStateEnum.LOST.getValue()));
+ applications.forEach(
+ (app) -> {
+ WATCHING_APPS.put(app.getId(), app);
+ STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
+ });
+ }
+
+ @PreDestroy
+ public void doStop() {
+ log.info(
+ "[StreamPark][SparkAppHttpWatcher] StreamPark Console will be
shutdown, persistent application to database.");
+ WATCHING_APPS.forEach((k, v) ->
applicationManageService.persistMetrics(v));
+ }
+
+ /**
+ * <strong>NOTE: The following conditions must be met for
execution</strong>
+ *
+ * <p><strong>1) Program started or page operated task, such as
start/stop, needs to return the
+ * state immediately. (the frequency of 1 second once, continued 10
seconds (10 times))</strong>
+ *
+ * <p><strong>2) Normal information obtain, once every 5 seconds</strong>
+ */
+ @Scheduled(fixedDelay = 1000)
+ public void start() {
+ Long timeMillis = System.currentTimeMillis();
+ if (lastWatchTime == null
+ || !OPTIONING.isEmpty()
+ || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
+ || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
+ lastWatchTime = timeMillis;
+ WATCHING_APPS.forEach(this::watch);
+ }
+ }
+
+ @VisibleForTesting
+ public @Nullable SparkAppStateEnum tryQuerySparkAppState(@Nonnull Long
appId) {
+ SparkApplication app = WATCHING_APPS.get(appId);
+ return (app == null || app.getState() == null) ? null :
app.getStateEnum();
+ }
+
+ private void watch(Long id, SparkApplication application) {
+ executorService.execute(
+ () -> {
+ try {
+ getStateFromYarn(application);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private StopFromEnum getAppStopFrom(Long appId) {
+ return STOP_FROM_MAP.getOrDefault(appId, StopFromEnum.NONE);
+ }
+
+ /**
+ * Query the job state from yarn and query the resource usage from spark
when job state is RUNNING
+ *
+ * @param application spark application
+ */
+ private void getStateFromYarn(SparkApplication application) throws
Exception {
+ SparkOptionStateEnum optionStateEnum =
OPTIONING.get(application.getId());
+
+ // query the status from the yarn rest Api
+ YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+ if (yarnAppInfo == null) {
+ throw new RuntimeException("[StreamPark][SparkAppHttpWatcher]
getStateFromYarn failed!");
+ } else {
+ try {
+ String state = yarnAppInfo.getApp().getState();
+ SparkAppStateEnum sparkAppStateEnum =
SparkAppStateEnum.of(state);
+ if (SparkAppStateEnum.OTHER == sparkAppStateEnum) {
+ return;
+ }
+ if
(SparkAppStateEnum.isEndState(sparkAppStateEnum.getValue())) {
+ log.info(
+ "[StreamPark][SparkAppHttpWatcher] getStateFromYarn,
app {} was ended, jobId is {}, state is {}",
+ application.getId(),
+ application.getJobId(),
+ sparkAppStateEnum);
+ application.setEndTime(new Date());
+ }
+ if (SparkAppStateEnum.RUNNING == sparkAppStateEnum) {
+ Tuple3<Double, Double, Long> resourceStatus =
getResourceStatus(application);
+ double memoryUsed = resourceStatus.t1;
+ double maxMemory = resourceStatus.t2;
+ double totalCores = resourceStatus.t3;
+ log.info(
+ "[StreamPark][SparkAppHttpWatcher] getStateFromYarn,
app {} was running, jobId is {}, memoryUsed: {}MB, maxMemory: {}MB, totalCores:
{}",
+ application.getId(),
+ application.getJobId(),
+ String.format("%.2f", memoryUsed),
+ String.format("%.2f", maxMemory),
+ totalCores);
+ // TODO: Modify the table structure to persist the results
+ }
+ application.setState(sparkAppStateEnum.getValue());
+ cleanOptioning(optionStateEnum, application.getId());
+ doPersistMetrics(application, false);
+ if (SparkAppStateEnum.FAILED == sparkAppStateEnum
+ || SparkAppStateEnum.LOST == sparkAppStateEnum
+ || applicationInfoService.checkAlter(application)) {
+ doAlert(application, sparkAppStateEnum);
+ if (SparkAppStateEnum.FAILED == sparkAppStateEnum) {
+ applicationActionService.start(application, true);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("[StreamPark][SparkAppHttpWatcher]
getStateFromYarn failed!");
+ }
+ }
+ }
+
+ /**
+ * Calculate spark task progress from Spark rest api. (proxyed by yarn)
Only available when yarn
+ * application status is RUNNING.
+ *
+ * @param application
+ * @return task progress
+ * @throws Exception
+ */
+ private double getTasksProgress(SparkApplication application) throws
Exception {
+ Job[] jobs = httpJobsStatus(application);
+ if (jobs.length == 0) {
+ return 0.0;
+ }
+ Optional<Tuple2<Integer, Integer>> jobsSumOption =
+ Arrays.stream(jobs)
+ .map(job -> new Tuple2<>(job.getNumCompletedTasks(),
job.getNumTasks()))
+ .reduce((val1, val2) -> new Tuple2<>(val1.t1 + val2.t1,
val1.t2 + val2.t2));
+ Tuple2<Integer, Integer> jobsSum = jobsSumOption.get();
+ return jobsSum.t1 * 1.0 / jobsSum.t2;
+ }
+
+ private Tuple3<Double, Double, Long> getResourceStatus(SparkApplication
application) throws Exception {
+ SparkExecutor[] executors = httpExecutorsStatus(application);
+ if (executors.length == 0) {
+ return new Tuple3<>(0.0, 0.0, 0L);
+ }
+ SparkExecutor totalExecutor =
+ Arrays.stream(executors)
+ .reduce(
+ (e1, e2) -> {
+ SparkExecutor temp = new SparkExecutor();
+ temp.setMemoryUsed(e1.getMemoryUsed() +
e2.getMemoryUsed());
+ temp.setMaxMemory(e1.getMaxMemory() +
e2.getMaxMemory());
+ temp.setTotalCores(e1.getTotalCores() +
e2.getTotalCores());
+ return temp;
+ })
+ .get();
+ return new Tuple3<>(
+ totalExecutor.getMemoryUsed() * 1.0 / 1024 / 1024,
+ totalExecutor.getMaxMemory() * 1.0 / 1024 / 1024,
+ totalExecutor.getTotalCores());
+ }
+
+ private void doPersistMetrics(SparkApplication application, boolean
stopWatch) {
+ if (SparkAppStateEnum.isEndState(application.getState())) {
+ application.setOverview(null);
+ application.setTotalTM(null);
+ application.setTotalSlot(null);
+ application.setTotalTask(null);
+ application.setAvailableSlot(null);
+ application.setJmMemory(null);
+ application.setTmMemory(null);
+ unWatching(application.getId());
+ } else if (stopWatch) {
+ unWatching(application.getId());
+ } else {
+ WATCHING_APPS.put(application.getId(), application);
+ }
+ applicationManageService.persistMetrics(application);
+ }
+
+ private void cleanOptioning(SparkOptionStateEnum optionStateEnum, Long
key) {
+ if (optionStateEnum != null) {
+ lastOptionTime = System.currentTimeMillis();
+ OPTIONING.remove(key);
+ }
+ }
+
+ /** set current option state */
+ public static void setOptionState(Long appId, SparkOptionStateEnum state) {
+ log.info("[StreamPark][SparkAppHttpWatcher] setOptioning");
+ OPTIONING.put(appId, state);
+ if (SparkOptionStateEnum.STOPPING == state) {
+ STOP_FROM_MAP.put(appId, StopFromEnum.STREAMPARK);
+ }
+ }
+
+ public static void doWatching(SparkApplication application) {
+ log.info(
+ "[StreamPark][SparkAppHttpWatcher] add app to tracking, appId:{}",
application.getId());
+ WATCHING_APPS.put(application.getId(), application);
+ STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
+ }
+
+ public static void unWatching(Long appId) {
+ log.info("[StreamPark][SparkAppHttpWatcher] stop app, appId:{}",
appId);
+ WATCHING_APPS.remove(appId);
+ }
+
+ public static void addCanceledApp(Long appId, Long userId) {
+ log.info(
+ "[StreamPark][SparkAppHttpWatcher] addCanceledApp app appId:{},
useId:{}", appId, userId);
+ CANCELLED_JOB_MAP.put(appId, userId);
+ }
+
+ public static Long getCanceledJobUserId(Long appId) {
+ return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) :
CANCELLED_JOB_MAP.get(appId);
+ }
+
+ public static Collection<SparkApplication> getWatchingApps() {
+ return WATCHING_APPS.values();
+ }
+
+ private YarnAppInfo httpYarnAppInfo(SparkApplication application) throws
Exception {
+ String reqURL = "ws/v1/cluster/apps/".concat(application.getJobId());
+ return yarnRestRequest(reqURL, YarnAppInfo.class);
+ }
+
+ private Job[] httpJobsStatus(SparkApplication application) throws
Exception {
+ String format = "proxy/%s/api/v1/applications/%s/jobs";
+ String reqURL = String.format(format, application.getJobId(),
application.getJobId());
+ return yarnRestRequest(reqURL, Job[].class);
+ }
+
+ private SparkExecutor[] httpExecutorsStatus(SparkApplication application)
throws Exception {
+ // "executor" is used for active executors only.
+ // "allexecutor" is used for all executors including the dead.
+ String format = "proxy/%s/api/v1/applications/%s/executors";
+ String reqURL = String.format(format, application.getJobId(),
application.getJobId());
+ return yarnRestRequest(reqURL, SparkExecutor[].class);
+ }
+
+ private <T> T yarnRestRequest(String url, Class<T> clazz) throws
IOException {
+ String result = YarnUtils.restRequest(url, HTTP_TIMEOUT);
+ if (null == result) {
+ return null;
+ }
+ return JacksonUtils.read(result, clazz);
+ }
+
+ public boolean isWatchingApp(Long id) {
+ return WATCHING_APPS.containsKey(id);
+ }
+
+ /**
+ * Describes the alarming behavior under abnormal operation for jobs
running in yarn mode.
+ *
+ * @param application spark application
+ * @param appState spark application state
+ */
+ private void doAlert(SparkApplication application, SparkAppStateEnum
appState) {
+ AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
+ alertService.alert(application.getAlertId(), alertTemplate);
+ }
+}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
index 32d09b856..0abc5f4c9 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
@@ -32,15 +32,15 @@ object SparkClient extends Logger {
private[this] val SUBMIT_REQUEST =
"org.apache.streampark.spark.client.bean.SubmitRequest" -> "submit"
- private[this] val CANCEL_REQUEST =
- "org.apache.streampark.spark.client.bean.CancelRequest" -> "cancel"
+ private[this] val STOP_REQUEST =
+ "org.apache.streampark.spark.client.bean.StopRequest" -> "stop"
def submit(submitRequest: SubmitRequest): SubmitResponse = {
proxy[SubmitResponse](submitRequest, submitRequest.sparkVersion,
SUBMIT_REQUEST)
}
- def cancel(stopRequest: CancelRequest): CancelResponse = {
- proxy[CancelResponse](stopRequest, stopRequest.sparkVersion,
CANCEL_REQUEST)
+ def stop(stopRequest: StopRequest): StopResponse = {
+ proxy[StopResponse](stopRequest, stopRequest.sparkVersion, STOP_REQUEST)
}
private[this] def proxy[T: ClassTag](
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
similarity index 96%
rename from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
rename to
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
index 6b8e1bfa2..4e2ab56bc 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
@@ -24,12 +24,11 @@ import javax.annotation.Nullable
import java.util.{Map => JavaMap}
-case class CancelRequest(
+case class StopRequest(
id: Long,
sparkVersion: SparkVersion,
executionMode: SparkExecutionMode,
@Nullable properties: JavaMap[String, Any],
- clusterId: String,
jobId: String,
withDrain: Boolean,
nativeFormat: Boolean)
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
similarity index 94%
copy from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
copy to
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
index d293947da..c8655d19b 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.spark.client.bean
-case class CancelResponse(savePointDir: String)
+case class StopResponse(savePointDir: String)
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
index d2582dd53..5ea75af01 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
@@ -24,5 +24,6 @@ import java.util.{Map => JavaMap}
case class SubmitResponse(
clusterId: String,
sparkConfig: JavaMap[String, String],
+ var sparkAppId: String,
@Nullable jobId: String = "",
@Nullable jobManagerUrl: String = "")
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
similarity index 75%
rename from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
rename to
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
index d293947da..99e97d3b6 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
@@ -15,6 +15,13 @@
* limitations under the License.
*/
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.spark.client.conf
-case class CancelResponse(savePointDir: String)
+object SparkConfiguration {
+ val defaultParameters = Map[String, Any](
+ "spark.driver.cores" -> "1",
+ "spark.driver.memory" -> "1g",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "1g")
+
+}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index 242806fb4..5cdfb9063 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -49,13 +49,13 @@ object SparkShimsProxy extends Logger {
private[this] lazy val SPARK_SHIMS_PREFIX = "streampark-spark-shims_spark"
def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = {
- val shimsClassLoader = getSParkShimsClassLoader(sparkVersion)
+ val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
ClassLoaderUtils
.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}
def proxy[T](sparkVersion: SparkVersion, func: JavaFunc[ClassLoader, T]): T
= {
- val shimsClassLoader = getSParkShimsClassLoader(sparkVersion)
+ val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
ClassLoaderUtils
.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}
@@ -134,14 +134,14 @@ object SparkShimsProxy extends Logger {
.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}
- private[this] def getSParkShimsClassLoader(sparkVersion: SparkVersion):
ClassLoader = {
+ private[this] def getSparkShimsClassLoader(sparkVersion: SparkVersion):
ClassLoader = {
logInfo(s"add spark shims urls classloader,spark version: $sparkVersion")
SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate(
s"${sparkVersion.fullVersion}", {
// 1) spark/lib
- val libURL = getSparkHomeLib(sparkVersion.sparkHome, "jars",
!_.getName.startsWith("log4j"))
- val shimsUrls = ListBuffer[URL](libURL: _*)
+ val libUrl = getSparkHomeLib(sparkVersion.sparkHome, "jars", f =>
!f.getName.startsWith("log4j") && !f.getName.startsWith("slf4j"))
+ val shimsUrls = ListBuffer[URL](libUrl: _*)
// 2) add all shims jar
addShimsUrls(
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
index 55e4fe081..437bf0ff2 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
@@ -38,12 +38,12 @@ object SparkClientEndpoint {
}
}
- def cancel(cancelRequest: CancelRequest): CancelResponse = {
- clients.get(cancelRequest.executionMode) match {
- case Some(client) => client.cancel(cancelRequest)
+ def stop(stopRequest: StopRequest): StopResponse = {
+ clients.get(stopRequest.executionMode) match {
+ case Some(client) => client.stop(stopRequest)
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${cancelRequest.executionMode} cancel ")
+ s"Unsupported ${stopRequest.executionMode} stop ")
}
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
index 8f0a00e9a..9870799c0 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
@@ -18,17 +18,19 @@
package org.apache.streampark.spark.client.impl
import org.apache.streampark.common.conf.Workspace
+import org.apache.streampark.common.enums.SparkExecutionMode
+import org.apache.streampark.common.util.HadoopUtils
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.streampark.spark.client.`trait`.SparkClientTrait
import org.apache.streampark.spark.client.bean._
+import org.apache.streampark.spark.client.conf.SparkConfiguration
import org.apache.commons.collections.MapUtils
+import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
import java.util.concurrent.{CountDownLatch, Executors, ExecutorService}
-import scala.util.control.Breaks.break
-
/** yarn application mode submit */
object YarnApplicationClient extends SparkClientTrait {
@@ -36,7 +38,8 @@ object YarnApplicationClient extends SparkClientTrait {
private[this] lazy val workspace = Workspace.remote
- override def doCancel(cancelRequest: CancelRequest): CancelResponse = {
+ override def doStop(stopRequest: StopRequest): StopResponse = {
+
HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId))
null
}
@@ -44,11 +47,9 @@ object YarnApplicationClient extends SparkClientTrait {
override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = {
launch(submitRequest)
- null
-
}
- private def launch(submitRequest: SubmitRequest): Unit = {
+ private def launch(submitRequest: SubmitRequest): SubmitResponse = {
val launcher: SparkLauncher = new SparkLauncher()
.setSparkHome(submitRequest.sparkVersion.sparkHome)
.setAppResource(submitRequest.buildResult
@@ -56,69 +57,61 @@ object YarnApplicationClient extends SparkClientTrait {
.shadedJarPath)
.setMainClass(submitRequest.appMain)
.setMaster("yarn")
- .setDeployMode("cluster")
+ .setDeployMode(submitRequest.executionMode match {
+ case SparkExecutionMode.YARN_CLIENT => "client"
+ case SparkExecutionMode.YARN_CLUSTER => "cluster"
+ case _ =>
+ throw new IllegalArgumentException(
+ "[StreamPark][YarnApplicationClient] Yarn mode only support
\"client\" and \"cluster\".")
+
+ })
.setAppName(submitRequest.appName)
- .setConf("spark.executor.memory", "5g")
- .setConf("spark.executor.cores", "4")
- .setConf("spark.num.executors", "1")
.setConf(
"spark.yarn.jars",
submitRequest
- .asInstanceOf[SubmitRequest]
.hdfsWorkspace
.sparkLib + "/*.jar")
.setVerbose(true)
+ import scala.collection.JavaConverters._
+ setDynamicProperties(launcher, submitRequest.properties.asScala.toMap)
+
+ // TODO: Adds command line arguments for the application.
+ // launcher.addAppArgs()
+
if (MapUtils.isNotEmpty(submitRequest.extraParameter) &&
submitRequest.extraParameter
.containsKey("sql")) {
launcher.addAppArgs("--sql",
submitRequest.extraParameter.get("sql").toString)
}
- logger.info("The spark task start")
+ logger.info("[StreamPark][YarnApplicationClient] The spark task start")
+ val cdlForApplicationId: CountDownLatch = new CountDownLatch(1)
+ var sparkAppHandle: SparkAppHandle = null
threadPool.execute(new Runnable {
override def run(): Unit = {
try {
val countDownLatch: CountDownLatch = new CountDownLatch(1)
- val sparkAppHandle: SparkAppHandle =
- launcher.startApplication(new SparkAppHandle.Listener() {
- override def stateChanged(handle: SparkAppHandle): Unit = {
- if (handle.getAppId != null) {
- logInfo(
- String.format("%s stateChanged :%s", handle.getAppId,
handle.getState.toString))
- } else logger.info("stateChanged :{}",
handle.getState.toString)
-
- if (SparkAppHandle.State.FAILED.toString ==
handle.getState.toString) {
- logger.error("Task run failure stateChanged :{}",
handle.getState.toString)
+ sparkAppHandle = launcher.startApplication(new
SparkAppHandle.Listener() {
+ override def stateChanged(handle: SparkAppHandle): Unit = {
+ if (handle.getAppId != null) {
+ if (cdlForApplicationId.getCount != 0) {
+ cdlForApplicationId.countDown()
}
+ logger.info("{} stateChanged :{}", Array(handle.getAppId,
handle.getState.toString))
+ } else logger.info("stateChanged :{}", handle.getState.toString)
- if (handle.getState.isFinal) countDownLatch.countDown()
+ if (SparkAppHandle.State.FAILED.toString ==
handle.getState.toString) {
+ logger.error("Task run failure stateChanged :{}",
handle.getState.toString)
}
- override def infoChanged(handle: SparkAppHandle): Unit = {}
- })
- logger.info(
- "The task is executing, current is get application id
before,please wait ........")
- var applicationId: String = null
- while ({
- !(SparkAppHandle.State.RUNNING == sparkAppHandle.getState)
- }) {
- applicationId = sparkAppHandle.getAppId
- if (applicationId != null) {
- logInfo(
- String.format(
- "handle current state is %s, appid is %s",
- sparkAppHandle.getState.toString,
- applicationId))
- break // todo: break is not supported
-
+ if (handle.getState.isFinal) {
+ countDownLatch.countDown()
+ }
}
- }
- logInfo(
- String.format(
- "handle current state is %s, appid is %s",
- sparkAppHandle.getState.toString,
- applicationId))
+
+ override def infoChanged(handle: SparkAppHandle): Unit = {}
+ })
countDownLatch.await()
} catch {
case e: Exception =>
@@ -127,6 +120,23 @@ object YarnApplicationClient extends SparkClientTrait {
}
})
+ cdlForApplicationId.await()
+ logger.info(
+ "[StreamPark][YarnApplicationClient] The task is executing, handle
current state is {}, appid is {}",
+ Array(sparkAppHandle.getState.toString, sparkAppHandle.getAppId))
+ SubmitResponse(null, null, sparkAppHandle.getAppId)
+ }
+
+ private def setDynamicProperties(sparkLauncher: SparkLauncher, properties:
Map[String, Any]): Unit = {
+ logger.info("[StreamPark][YarnApplicationClient] Spark launcher start
configuration.")
+ val finalProperties: Map[String, Any] =
SparkConfiguration.defaultParameters ++ properties
+ for ((k, v) <- finalProperties) {
+ if (k.startsWith("spark.")) {
+ sparkLauncher.setConf(k, v.toString)
+ } else {
+ logger.info("[StreamPark][YarnApplicationClient] \"{}\" doesn't start
with \"spark.\". Skip it.", k)
+ }
+ }
}
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index 75d6ea74d..93f32aad0 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -28,7 +28,7 @@ trait SparkClientTrait extends Logger {
def submit(submitRequest: SubmitRequest): SubmitResponse = {
logInfo(
s"""
- |--------------------------------------- spark job start
---------------------------------------
+ |--------------------------------------- spark job start
-----------------------------------
| userSparkHome : ${submitRequest.sparkVersion.sparkHome}
| sparkVersion : ${submitRequest.sparkVersion.version}
| appName : ${submitRequest.appName}
@@ -57,27 +57,25 @@ trait SparkClientTrait extends Logger {
def setConfig(submitRequest: SubmitRequest): Unit
@throws[Exception]
- def cancel(cancelRequest: CancelRequest): CancelResponse = {
+ def stop(stopRequest: StopRequest): StopResponse = {
logInfo(
s"""
- |----------------------------------------- spark job cancel
--------------------------------
- | userSparkHome : ${cancelRequest.sparkVersion.sparkHome}
- | sparkVersion : ${cancelRequest.sparkVersion.version}
- | clusterId : ${cancelRequest.clusterId}
- | withDrain : ${cancelRequest.withDrain}
- | nativeFormat : ${cancelRequest.nativeFormat}
- | appId : ${cancelRequest.clusterId}
- | jobId : ${cancelRequest.jobId}
+ |----------------------------------------- spark job stop
----------------------------------
+ | userSparkHome : ${stopRequest.sparkVersion.sparkHome}
+ | sparkVersion : ${stopRequest.sparkVersion.version}
+ | withDrain : ${stopRequest.withDrain}
+ | nativeFormat : ${stopRequest.nativeFormat}
+ | jobId : ${stopRequest.jobId}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
- doCancel(cancelRequest)
+ doStop(stopRequest)
}
@throws[Exception]
def doSubmit(submitRequest: SubmitRequest): SubmitResponse
@throws[Exception]
- def doCancel(cancelRequest: CancelRequest): CancelResponse
+ def doStop(stopRequest: StopRequest): StopResponse
}