This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new af655696f [Feature] Support spark job status tracking (#3843)
af655696f is described below

commit af655696f3361946c792a32214908b3061675ba7
Author: lenoxzhao <[email protected]>
AuthorDate: Fri Jul 19 15:43:08 2024 +0800

    [Feature] Support spark job status tracking (#3843)
    
    * feat: add spark job state tracking
    
    * feat: adjust base on updated code and add resource monitoring
    
    * fix: fix e2e build failure
    
    * feature: support spark parameters configuring
    
    * feature: change cancel operation to stop operation
    
    * fix: modify comment
    
    * improve: remove flink related feature in SparkApplicationInfoService
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../main/assembly/script/schema/mysql-schema.sql   |  19 +
 .../base/config/AsyncExecutorPoolConfig.java       |  16 +
 .../console/core/bean/AlertTemplate.java           |  33 ++
 .../controller/SparkApplicationController.java     |  14 +-
 .../console/core/entity/SparkApplication.java      |  16 +-
 .../console/core/entity/SparkApplicationLog.java   |  51 +++
 .../console/core/enums/SparkAppStateEnum.java      |  74 ++--
 .../console/core/enums/SparkOperationEnum.java     |  27 +-
 .../console/core/enums/SparkOptionStateEnum.java   |  39 ++-
 .../core/mapper/SparkApplicationLogMapper.java     |   9 +-
 .../streampark/console/core/metrics/spark/Job.java |  67 ++++
 .../console/core/metrics/spark/SparkExecutor.java  |  16 +-
 .../core/service/SparkApplicationLogService.java   |  44 +++
 .../application/SparkApplicationActionService.java |   8 +-
 .../application/SparkApplicationInfoService.java   |  33 --
 .../impl/SparkApplicationActionServiceImpl.java    | 246 +++++--------
 .../impl/SparkApplicationInfoServiceImpl.java      | 137 +-------
 .../service/impl/SparkAppBuildPipeServiceImpl.java |  20 +-
 .../impl/SparkApplicationLogServiceImpl.java       |  57 +++
 .../console/core/watcher/SparkAppHttpWatcher.java  | 389 +++++++++++++++++++++
 .../streampark/spark/client/SparkClient.scala      |   8 +-
 .../{CancelRequest.scala => StopRequest.scala}     |   3 +-
 .../{CancelResponse.scala => StopResponse.scala}   |   2 +-
 .../spark/client/bean/SubmitResponse.scala         |   1 +
 .../SparkConfiguration.scala}                      |  11 +-
 .../spark/client/proxy/SparkShimsProxy.scala       |  10 +-
 .../spark/client/SparkClientEndpoint.scala         |   8 +-
 .../spark/client/impl/YarnApplicationClient.scala  | 102 +++---
 .../spark/client/trait/SparkClientTrait.scala      |  22 +-
 29 files changed, 964 insertions(+), 518 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 41495bc71..69abc8634 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -632,4 +632,23 @@ create table `t_spark_app` (
   index `inx_team` (`team_id`) using btree
 ) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
 
+
+-- ----------------------------
+-- table structure for t_spark_log
+-- ----------------------------
+drop table if exists `t_spark_log`;
+create table `t_spark_log` (
+  `id` bigint not null auto_increment,
+  `app_id` bigint default null,
+  `spark_app_id` varchar(64) collate utf8mb4_general_ci default null,
+  `track_url` varchar(255) collate utf8mb4_general_ci default null,
+  `success` tinyint default null,
+  `exception` text collate utf8mb4_general_ci,
+  `option_time` datetime default null,
+  `option_name` tinyint default null,
+  `user_id` bigint default null,
+  primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
+
+
 set foreign_key_checks = 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
index 98d9f3f9c..5d51b3df1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
@@ -66,6 +66,22 @@ public class AsyncExecutorPoolConfig extends 
AsyncConfigurerSupport {
             ThreadUtils.threadFactory("flink-restapi-watching-executor-"));
     }
 
+    /**
+     * Create a ThreadPoolTaskExecutor for SparkAppHttpWatcher.
+     *
+     * @return Executor
+     */
+    @Bean("sparkRestAPIWatchingExecutor")
+    public Executor sparkRestAPIWatchingExecutor() {
+        return new ThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors() * 5,
+            Runtime.getRuntime().availableProcessors() * 10,
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(1024),
+            ThreadUtils.threadFactory("spark-cluster-watching-executor-"));
+    }
+
     /**
      * Create a ThreadPoolTaskExecutor for FlinkClusterWatcher.
      *
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index dee7241b3..ce24a1e9f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -19,12 +19,15 @@ package org.apache.streampark.console.core.bean;
 
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
+import org.apache.streampark.common.enums.SparkExecutionMode;
 import org.apache.streampark.common.util.DateUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
 
 import lombok.Data;
 
@@ -140,6 +143,25 @@ public class AlertTemplate implements Serializable {
             .build();
     }
 
+    public static AlertTemplate of(SparkApplication application, 
SparkAppStateEnum appState) {
+        return new AlertTemplateBuilder()
+            .setDuration(application.getStartTime(), application.getEndTime())
+            .setJobName(application.getJobName())
+            .setLink(application.getSparkExecutionMode(), 
application.getJobId())
+            .setStartTime(application.getStartTime())
+            .setEndTime(application.getEndTime())
+            .setRestart(application.isNeedRestartOnFailed(), 
application.getRestartCount())
+            .setRestartIndex(application.getRestartCount())
+            .setTotalRestart(application.getRestartSize())
+            .setType(1)
+            .setTitle(
+                String.format(
+                    "%s %s %s", ALERT_TITLE_PREFIX, application.getJobName(), 
appState.name()))
+            .setSubject(
+                String.format("%s %s %s", ALERT_SUBJECT_PREFIX, 
application.getJobName(), appState))
+            .setStatus(appState.name())
+            .build();
+    }
     private static class AlertTemplateBuilder {
 
         private final AlertTemplate alertTemplate = new AlertTemplate();
@@ -218,6 +240,17 @@ public class AlertTemplate implements Serializable {
             return this;
         }
 
+        public AlertTemplateBuilder setLink(SparkExecutionMode mode, String 
appId) {
+            if (SparkExecutionMode.isYarnMode(mode)) {
+                String format = "%s/proxy/%s/";
+                String url = String.format(format, 
YarnUtils.getRMWebAppURL(false), appId);
+                alertTemplate.setLink(url);
+            } else {
+                alertTemplate.setLink(null);
+            }
+            return this;
+        }
+
         public AlertTemplateBuilder setCpFailureRateInterval(String 
cpFailureRateInterval) {
             alertTemplate.setCpFailureRateInterval(cpFailureRateInterval);
             return this;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
index 4063710f6..2b413c760 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
@@ -24,12 +24,12 @@ import 
org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.core.annotation.AppUpdated;
 import org.apache.streampark.console.core.entity.ApplicationBackUp;
-import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.enums.AppExistsStateEnum;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
@@ -70,7 +70,7 @@ public class SparkApplicationController {
     private ApplicationBackUpService backUpService;
 
     @Autowired
-    private ApplicationLogService applicationLogService;
+    private SparkApplicationLogService applicationLogService;
 
     @Autowired
     private ResourceService resourceService;
@@ -153,8 +153,8 @@ public class SparkApplicationController {
 
     @PostMapping(value = "cancel")
     @RequiresPermissions("app:cancel")
-    public RestResponse cancel(SparkApplication app) throws Exception {
-        applicationActionService.cancel(app);
+    public RestResponse stop(SparkApplication app) throws Exception {
+        applicationActionService.stop(app);
         return RestResponse.success();
     }
 
@@ -209,8 +209,8 @@ public class SparkApplicationController {
     }
 
     @PostMapping("optionlog")
-    public RestResponse optionlog(ApplicationLog applicationLog, RestRequest 
request) {
-        IPage<ApplicationLog> applicationList = 
applicationLogService.getPage(applicationLog, request);
+    public RestResponse optionlog(SparkApplicationLog applicationLog, 
RestRequest request) {
+        IPage<SparkApplicationLog> applicationList = 
applicationLogService.getPage(applicationLog, request);
         return RestResponse.success(applicationList);
     }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 9c4be43e5..c07e2b160 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -32,6 +32,7 @@ import org.apache.streampark.console.core.bean.Dependency;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.enums.ResourceFromEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
 import org.apache.streampark.console.core.metrics.flink.JobsOverview;
 import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
 import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
@@ -285,19 +286,16 @@ public class SparkApplication extends BaseEntity {
     }
 
     /**
-     * Determine if a FlinkAppState requires tracking.
+     * Determine if a SparkAppState requires tracking.
      *
      * @return 1: need to be tracked | 0: no need to be tracked.
      */
     public Boolean shouldTracking() {
         switch (getStateEnum()) {
             case ADDED:
-            case CREATED:
             case FINISHED:
             case FAILED:
-            case CANCELED:
-            case TERMINATED:
-            case POS_TERMINATED:
+            case KILLED:
                 return false;
             default:
                 return true;
@@ -312,15 +310,11 @@ public class SparkApplication extends BaseEntity {
     public boolean isCanBeStart() {
         switch (getStateEnum()) {
             case ADDED:
-            case CREATED:
             case FAILED:
-            case CANCELED:
             case FINISHED:
             case LOST:
-            case TERMINATED:
             case SUCCEEDED:
             case KILLED:
-            case POS_TERMINATED:
                 return true;
             default:
                 return false;
@@ -338,8 +332,8 @@ public class SparkApplication extends BaseEntity {
     }
 
     @JsonIgnore
-    public FlinkAppStateEnum getStateEnum() {
-        return FlinkAppStateEnum.of(state);
+    public SparkAppStateEnum getStateEnum() {
+        return SparkAppStateEnum.of(state);
     }
 
     @JsonIgnore
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java
new file mode 100644
index 000000000..8c51c92ed
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplicationLog.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Data
+@TableName("t_spark_log")
+@Slf4j
+public class SparkApplicationLog {
+
+    @TableId(type = IdType.AUTO)
+    private Long id;
+    /** appId */
+    private Long appId;
+    /** applicationId */
+    private String sparkAppId;
+    /** tracking url of current spark application */
+    private String trackUrl;
+    /** start status */
+    private Boolean success;
+    /** option name */
+    private Integer optionName;
+    /** option time */
+    private Date optionTime;
+    /** exception at the start */
+    private String exception;
+    /** The user who operates the application */
+    private Long userId;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
index 822e32ba6..9e9847670 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkAppStateEnum.java
@@ -26,75 +26,47 @@ public enum SparkAppStateEnum {
     /** Added new job to database. */
     ADDED(0),
 
-    /**
-     * The job has been received by the Dispatcher, and is waiting for the job 
manager to be created.
-     */
-    INITIALIZING(1),
+    /** (From Yarn)Application which was just created. */
+    NEW(1),
 
-    /** Job is newly created, no task has started to run. */
-    CREATED(2),
+    /** (From Yarn)Application which is being saved. */
+    NEW_SAVING(2),
 
     /** Application which is currently running. */
     STARTING(3),
 
-    /** Application which is currently running. */
-    RESTARTING(4),
+    /** (From Yarn)Application which has been submitted. */
+    SUBMITTED(4),
 
-    /** Some tasks are scheduled or running, some may be pending, some may be 
finished. */
-    RUNNING(5),
+    /** (From Yarn)Application has been accepted by the scheduler. */
+    ACCEPTED(5),
 
     /** The job has failed and is currently waiting for the cleanup to 
complete. */
-    FAILING(6),
-
-    /** The job has failed with a non-recoverable task failure. */
-    FAILED(7),
-
-    /** Job is being cancelled. */
-    CANCELLING(8),
-
-    /** Job has been cancelled. */
-    CANCELED(9),
+    RUNNING(6),
 
-    /** All the job's tasks have successfully finished. */
-    FINISHED(10),
+    /** (From Yarn)Application which finished successfully. */
+    FINISHED(7),
 
-    /**
-     * The job has been suspended which means that it has been stopped but not 
been removed from a
-     * potential HA job store.
-     */
-    SUSPENDED(11),
-
-    /** The job is currently reconciling and waits for task execution report 
to recover state. */
-    RECONCILING(12),
+    /** (From Yarn)Application which failed. */
+    FAILED(8),
 
     /** Loss of mapping. */
-    LOST(13),
+    LOST(9),
 
     /** Mapping. */
-    MAPPING(14),
+    MAPPING(10),
 
     /** Other statuses. */
-    OTHER(15),
+    OTHER(11),
 
     /** Has rollback. */
-    REVOKED(16),
-
-    /**
-     * Lost track of Spark job temporarily. A complete loss of Spark job 
tracking translates into LOST
-     * state.
-     */
-    @Deprecated
-    SILENT(17),
-
-    /** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED. 
*/
-    TERMINATED(18),
+    REVOKED(12),
 
-    /** Spark job has terminated vaguely, maybe FINISHED, CANCELED or FAILED. 
*/
-    @Deprecated
-    POS_TERMINATED(19),
+    /** Spark job has being cancelling(killing) by streampark */
+    STOPPING(13),
 
     /** Job SUCCEEDED on yarn. */
-    SUCCEEDED(20),
+    SUCCEEDED(14),
 
     /** Has killed in Yarn. */
     KILLED(-9);
@@ -125,13 +97,11 @@ public enum SparkAppStateEnum {
 
     public static boolean isEndState(Integer appState) {
         SparkAppStateEnum sparkAppStateEnum = SparkAppStateEnum.of(appState);
-        return SparkAppStateEnum.CANCELED == sparkAppStateEnum
-            || SparkAppStateEnum.FAILED == sparkAppStateEnum
+        return SparkAppStateEnum.FAILED == sparkAppStateEnum
             || SparkAppStateEnum.KILLED == sparkAppStateEnum
             || SparkAppStateEnum.FINISHED == sparkAppStateEnum
             || SparkAppStateEnum.SUCCEEDED == sparkAppStateEnum
-            || SparkAppStateEnum.LOST == sparkAppStateEnum
-            || SparkAppStateEnum.TERMINATED == sparkAppStateEnum;
+            || SparkAppStateEnum.LOST == sparkAppStateEnum;
     }
 
     public static boolean isLost(Integer appState) {
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java
similarity index 63%
copy from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java
index d2582dd53..4d9ee1ae2 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOperationEnum.java
@@ -15,14 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.enums;
 
-import javax.annotation.Nullable
+import lombok.Getter;
 
-import java.util.{Map => JavaMap}
+import java.util.Arrays;
 
-case class SubmitResponse(
-    clusterId: String,
-    sparkConfig: JavaMap[String, String],
-    @Nullable jobId: String = "",
-    @Nullable jobManagerUrl: String = "")
+/** Spark Operation type */
+@Getter
+public enum SparkOperationEnum {
+
+    RELEASE(0), START(1), STOP(2);
+
+    private final int value;
+
+    SparkOperationEnum(int value) {
+        this.value = value;
+    }
+
+    public static SparkOperationEnum of(Integer option) {
+        return Arrays.stream(values()).filter((x) -> x.value == 
option).findFirst().orElse(null);
+    }
+}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java
similarity index 52%
copy from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java
index 6b8e1bfa2..b3a3e03e8 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/SparkOptionStateEnum.java
@@ -15,21 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.enums;
 
-import org.apache.streampark.common.conf.SparkVersion
-import org.apache.streampark.common.enums.SparkExecutionMode
+import lombok.Getter;
 
-import javax.annotation.Nullable
+import java.util.Arrays;
 
-import java.util.{Map => JavaMap}
+/** Option status */
+@Getter
+public enum SparkOptionStateEnum {
 
-case class CancelRequest(
-    id: Long,
-    sparkVersion: SparkVersion,
-    executionMode: SparkExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    jobId: String,
-    withDrain: Boolean,
-    nativeFormat: Boolean)
+    /** Application which is currently action: none. */
+    NONE(0),
+    /** Application which is currently action: releasing. */
+    RELEASING(1),
+    /** Application which is currently action: starting. */
+    STARTING(2),
+    /** Application which is currently action: stopping. */
+    STOPPING(3);
+
+    private final int value;
+
+    SparkOptionStateEnum(int value) {
+        this.value = value;
+    }
+
+    public static SparkOptionStateEnum of(Integer state) {
+        return Arrays.stream(values()).filter((x) -> x.value == 
state).findFirst().orElse(null);
+    }
+}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java
similarity index 75%
copy from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java
index d293947da..c9e77768a 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkApplicationLogMapper.java
@@ -15,6 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.mapper;
 
-case class CancelResponse(savePointDir: String)
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface SparkApplicationLogMapper extends 
BaseMapper<SparkApplicationLog> {
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java
new file mode 100644
index 000000000..be28f6631
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/Job.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.metrics.spark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+import scala.collection.Map;
+
+@Data
+public class Job implements Serializable {
+
+    @JsonProperty("jobId")
+    private Long id;
+
+    private String name;
+
+    private String submissionTime;
+
+    private String completionTime;
+
+    private List<Long> stageIds;
+
+    private String status;
+
+    private Integer numTasks;
+
+    private Integer numActiveTasks;
+
+    private Integer numCompletedTasks;
+
+    private Integer numSkippedTasks;
+
+    private Integer numFailedTasks;
+
+    private Integer numKilledTasks;
+
+    private Integer numCompletedIndices;
+
+    private Integer numActiveStages;
+
+    private Integer numCompletedStages;
+
+    private Integer numSkippedStages;
+
+    private Integer numFailedStages;
+
+    private Map<String, Object> killedTasksSummary;
+}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java
similarity index 75%
copy from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java
index d293947da..12e0c5e22 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkExecutor.java
@@ -15,6 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.console.core.metrics.spark;
 
-case class CancelResponse(savePointDir: String)
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SparkExecutor implements Serializable {
+
+    private Long memoryUsed;
+
+    private Long maxMemory;
+
+    private Long totalCores;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
new file mode 100644
index 000000000..6edee09ac
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/** This interface is used to record spark application operation logs */
+public interface SparkApplicationLogService extends 
IService<SparkApplicationLog> {
+
+    /**
+     * Retrieves a page of {@link SparkApplicationLog} objects based on the 
provided parameters.
+     *
+     * @param sparkApplicationLog The {@link SparkApplicationLog} object 
containing the search criteria.
+     * @param request The {@link RestRequest} object used for pagination and 
sorting.
+     * @return An {@link IPage} containing the retrieved {@link 
SparkApplicationLog} objects.
+     */
+    IPage<SparkApplicationLog> getPage(SparkApplicationLog 
sparkApplicationLog, RestRequest request);
+
+    /**
+     * remove application log by application id
+     *
+     * @param appId The id of the application to be removed
+     */
+    void removeByAppId(Long appId);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
index 3a1c086ca..682717cbe 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java
@@ -54,12 +54,12 @@ public interface SparkApplicationActionService extends 
IService<SparkApplication
     void revoke(Long appId) throws ApplicationException;
 
     /**
-     * Cancels the given application. Throws an exception if cancellation 
fails.
+     * Stop the given application. Throws an exception if stop fails.
      *
-     * @param appParam the application to be canceled
-     * @throws Exception if cancellation fails
+     * @param appParam the application to be stopped
+     * @throws Exception if stop fails
      */
-    void cancel(SparkApplication appParam) throws Exception;
+    void stop(SparkApplication appParam) throws Exception;
 
     /**
      * Forces the given application to stop.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
index 81b9091e7..4100826c5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationInfoService.java
@@ -87,39 +87,6 @@ public interface SparkApplicationInfoService extends 
IService<SparkApplication>
      */
     boolean existsBySparkEnvId(Long sparkEnvId);
 
-    /**
-     * Checks if a job is running for a given cluster ID.
-     *
-     * @param clusterId The ID of the cluster.
-     * @return true if a job is running for the given cluster ID; otherwise, 
false.
-     */
-    boolean existsRunningByClusterId(Long clusterId);
-
-    /**
-     * Checks if there is a job that is associated with the given cluster ID.
-     *
-     * @param clusterId The ID of the cluster.
-     * @return True if a job exists for the given cluster ID, false otherwise.
-     */
-    boolean existsByClusterId(Long clusterId);
-
-    /**
-     * Counts the number of items associated with the given cluster ID.
-     *
-     * @param clusterId The ID of the cluster.
-     * @return The number of items associated with the given cluster ID.
-     */
-    Integer countByClusterId(Long clusterId);
-
-    /**
-     * Counts the number of items associated with the given cluster ID and 
database type.
-     *
-     * @param clusterId The ID of the cluster.
-     * @param dbType The type of the database.
-     * @return The number of items associated with the given cluster ID and 
database type.
-     */
-    Integer countAffectedByClusterId(Long clusterId, String dbType);
-
     /**
      * Gets the YARN name for the given application.
      *
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 46e447c75..4ddb22c51 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -21,9 +21,7 @@ import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
-import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.SparkExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.AssertUtils;
@@ -31,52 +29,46 @@ import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
-import org.apache.streampark.console.core.entity.ApplicationLog;
-import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Resource;
-import org.apache.streampark.console.core.entity.SavePoint;
 import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.entity.SparkEnv;
-import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
-import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
-import org.apache.streampark.console.core.enums.OperationEnum;
-import org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkOperationEnum;
+import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
 import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationConfigService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.ServiceHelper;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.VariableService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
-import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
+import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 import org.apache.streampark.spark.client.SparkClient;
-import org.apache.streampark.spark.client.bean.CancelRequest;
-import org.apache.streampark.spark.client.bean.CancelResponse;
+import org.apache.streampark.spark.client.bean.StopRequest;
+import org.apache.streampark.spark.client.bean.StopResponse;
 import org.apache.streampark.spark.client.bean.SubmitRequest;
 import org.apache.streampark.spark.client.bean.SubmitResponse;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -91,7 +83,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 
 import java.io.File;
-import java.net.URI;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -104,6 +95,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
+import static org.apache.hadoop.service.Service.STATE.STARTED;
+
 @Slf4j
 @Service
 public class SparkApplicationActionServiceImpl
@@ -123,7 +116,7 @@ public class SparkApplicationActionServiceImpl
     private ApplicationConfigService configService;
 
     @Autowired
-    private ApplicationLogService applicationLogService;
+    private SparkApplicationLogService applicationLogService;
 
     @Autowired
     private SparkEnvService sparkEnvService;
@@ -137,21 +130,15 @@ public class SparkApplicationActionServiceImpl
     @Autowired
     private AppBuildPipeService appBuildPipeService;
 
-    @Autowired
-    private FlinkClusterService flinkClusterService;
-
     @Autowired
     private VariableService variableService;
 
     @Autowired
     private ResourceService resourceService;
 
-    @Autowired
-    private FlinkClusterWatcher flinkClusterWatcher;
-
     private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap 
= new ConcurrentHashMap<>();
 
-    private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap 
= new ConcurrentHashMap<>();
+    private final Map<Long, CompletableFuture<StopResponse>> stopFutureMap = 
new ConcurrentHashMap<>();
 
     @Override
     public void revoke(Long appId) throws ApplicationException {
@@ -174,104 +161,75 @@ public class SparkApplicationActionServiceImpl
             updateWrapper.set(SparkApplication::getRelease, 
ReleaseStateEnum.NEED_RELEASE.get());
         }
         if (!application.isRunning()) {
-            updateWrapper.set(SparkApplication::getState, 
FlinkAppStateEnum.REVOKED.getValue());
+            updateWrapper.set(SparkApplication::getState, 
SparkAppStateEnum.REVOKED.getValue());
         }
         baseMapper.update(null, updateWrapper);
     }
 
     @Override
     public void restart(SparkApplication appParam) throws Exception {
-        this.cancel(appParam);
+        this.stop(appParam);
         this.start(appParam, false);
     }
 
     @Override
     public void forcedStop(Long id) {
         CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(id);
-        CompletableFuture<CancelResponse> cancelFuture = 
cancelFutureMap.remove(id);
+        CompletableFuture<StopResponse> stopFuture = stopFutureMap.remove(id);
         SparkApplication application = this.baseMapper.selectApp(id);
         if (startFuture != null) {
             startFuture.cancel(true);
         }
-        if (cancelFuture != null) {
-            cancelFuture.cancel(true);
+        if (stopFuture != null) {
+            stopFuture.cancel(true);
         }
-        if (startFuture == null && cancelFuture == null) {
+        if (startFuture == null && stopFuture == null) {
             this.doStopped(id);
         }
     }
 
     @Override
-    public void cancel(SparkApplication appParam) throws Exception {
-        FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionStateEnum.CANCELLING);
+    public void stop(SparkApplication appParam) throws Exception {
+        SparkAppHttpWatcher.setOptionState(appParam.getId(), 
SparkOptionStateEnum.STOPPING);
         SparkApplication application = getById(appParam.getId());
-        application.setState(FlinkAppStateEnum.CANCELLING.getValue());
+        application.setState(SparkAppStateEnum.STOPPING.getValue());
 
-        ApplicationLog applicationLog = new ApplicationLog();
-        applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
+        SparkApplicationLog applicationLog = new SparkApplicationLog();
+        applicationLog.setOptionName(SparkOperationEnum.STOP.getValue());
         applicationLog.setAppId(application.getId());
-        applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+        applicationLog.setTrackUrl(application.getJobManagerUrl());
         applicationLog.setOptionTime(new Date());
-        applicationLog.setYarnAppId(application.getClusterId());
+        applicationLog.setSparkAppId(application.getJobId());
         applicationLog.setUserId(serviceHelper.getUserId());
-
-        if (appParam.getSavePointed()) {
-            FlinkAppHttpWatcher.addSavepoint(application.getId());
-            
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
-        } else {
-            application.setOptionState(OptionStateEnum.CANCELLING.getValue());
-        }
-
         application.setOptionTime(new Date());
         this.baseMapper.updateById(application);
 
         Long userId = serviceHelper.getUserId();
         if (!application.getUserId().equals(userId)) {
-            FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
+            SparkAppHttpWatcher.addCanceledApp(application.getId(), userId);
         }
 
         SparkEnv sparkEnv = 
sparkEnvService.getById(application.getVersionId());
 
-        String clusterId = null;
-        if (SparkExecutionMode.isYarnMode(application.getExecutionMode())) {
-            clusterId = application.getAppId();
-        }
-
         Map<String, Object> properties = new HashMap<>();
 
-        if 
(SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
-            FlinkCluster cluster = 
flinkClusterService.getById(application.getSparkClusterId());
-            ApiAlertException.throwIfNull(
-                cluster,
-                String.format(
-                    "The clusterId=%s cannot be find, maybe the clusterId is 
wrong or "
-                        + "the cluster has been deleted. Please contact the 
Admin.",
-                    application.getSparkClusterId()));
-            URI activeAddress = cluster.getRemoteURI();
-            properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
-            properties.put(RestOptions.PORT.key(), activeAddress.getPort());
-        }
-
-        CancelRequest cancelRequest = new CancelRequest(
-            application.getId(),
-            sparkEnv.getSparkVersion(),
-            SparkExecutionMode.of(application.getExecutionMode()),
-            properties,
-            clusterId,
-            application.getJobId(),
-            appParam.getDrain(),
-            appParam.getNativeFormat());
-
-        final Date triggerTime = new Date();
-        CompletableFuture<CancelResponse> cancelFuture = CompletableFuture
-            .supplyAsync(() -> SparkClient.cancel(cancelRequest), 
executorService);
-
-        cancelFutureMap.put(application.getId(), cancelFuture);
-
-        cancelFuture.whenComplete(
+        StopRequest stopRequest =
+            new StopRequest(
+                application.getId(),
+                sparkEnv.getSparkVersion(),
+                SparkExecutionMode.of(application.getExecutionMode()),
+                properties,
+                application.getJobId(),
+                appParam.getDrain(),
+                appParam.getNativeFormat());
+
+        CompletableFuture<StopResponse> stopFuture =
+            CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest), 
executorService);
+
+        stopFutureMap.put(application.getId(), stopFuture);
+        stopFuture.whenComplete(
             (cancelResponse, throwable) -> {
-                cancelFutureMap.remove(application.getId());
-
+                stopFutureMap.remove(application.getId());
                 if (throwable != null) {
                     String exception = 
ExceptionUtils.stringifyException(throwable);
                     applicationLog.setException(exception);
@@ -281,31 +239,17 @@ public class SparkApplicationActionServiceImpl
                     if (throwable instanceof CancellationException) {
                         doStopped(application.getId());
                     } else {
-                        log.error("stop flink job failed.", throwable);
-                        
application.setOptionState(OptionStateEnum.NONE.getValue());
-                        
application.setState(FlinkAppStateEnum.FAILED.getValue());
+                        log.error("stop spark job failed.", throwable);
+                        
application.setOptionState(SparkOptionStateEnum.NONE.getValue());
+                        
application.setState(SparkAppStateEnum.FAILED.getValue());
                         updateById(application);
-
-                        FlinkAppHttpWatcher.unWatching(application.getId());
+                        SparkAppHttpWatcher.unWatching(application.getId());
                     }
                     return;
                 }
-
                 applicationLog.setSuccess(true);
                 // save log...
                 applicationLogService.save(applicationLog);
-
-                if (cancelResponse != null && cancelResponse.savePointDir() != 
null) {
-                    String savePointDir = cancelResponse.savePointDir();
-                    log.info("savePoint path: {}", savePointDir);
-                    SavePoint savePoint = new SavePoint();
-                    savePoint.setPath(savePointDir);
-                    savePoint.setAppId(application.getId());
-                    savePoint.setLatest(true);
-                    savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
-                    savePoint.setCreateTime(new Date());
-                    savePoint.setTriggerTime(triggerTime);
-                }
             });
     }
 
@@ -317,24 +261,16 @@ public class SparkApplicationActionServiceImpl
         ApiAlertException.throwIfTrue(
             !application.isCanBeStart(), "[StreamPark] The application cannot 
be started repeatedly.");
 
-        if 
(SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
-            checkBeforeStart(application);
-        }
+        SparkEnv sparkEnv = 
sparkEnvService.getByIdOrDefault(application.getVersionId());
+        ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found 
spark version");
 
         if 
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
-
-            ApiAlertException.throwIfTrue(
-                
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
-                "[StreamPark] The same task name is already running in the 
yarn queue");
+            checkYarnBeforeStart(application);
         }
 
         AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
         AssertUtils.notNull(buildPipeline);
 
-        SparkEnv sparkEnv = 
sparkEnvService.getByIdOrDefault(application.getVersionId());
-
-        ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found 
flink version");
-
         // if manually started, clear the restart flag
         if (!auto) {
             application.setRestartCount(0);
@@ -342,7 +278,6 @@ public class SparkApplicationActionServiceImpl
             if (!application.isNeedRestartOnFailed()) {
                 return;
             }
-            appParam.setSavePointed(true);
             application.setRestartCount(application.getRestartCount() + 1);
         }
 
@@ -350,8 +285,8 @@ public class SparkApplicationActionServiceImpl
         starting(application);
 
         String jobId = new JobID().toHexString();
-        ApplicationLog applicationLog = new ApplicationLog();
-        applicationLog.setOptionName(OperationEnum.START.getValue());
+        SparkApplicationLog applicationLog = new SparkApplicationLog();
+        applicationLog.setOptionName(SparkOperationEnum.START.getValue());
         applicationLog.setAppId(application.getId());
         applicationLog.setOptionTime(new Date());
         applicationLog.setUserId(serviceHelper.getUserId());
@@ -376,6 +311,7 @@ public class SparkApplicationActionServiceImpl
         if (SparkExecutionMode.YARN_CLUSTER == 
application.getSparkExecutionMode()
             || SparkExecutionMode.YARN_CLIENT == 
application.getSparkExecutionMode()) {
             buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
+            application.setJobManagerUrl(YarnUtils.getRMWebAppURL(true));
         }
 
         // Get the args after placeholder replacement
@@ -401,7 +337,6 @@ public class SparkApplicationActionServiceImpl
             .supplyAsync(() -> SparkClient.submit(submitRequest), 
executorService);
 
         startFutureMap.put(application.getId(), future);
-
         future.whenComplete(
             (response, throwable) -> {
                 // 1) remove Future
@@ -417,16 +352,17 @@ public class SparkApplicationActionServiceImpl
                         doStopped(application.getId());
                     } else {
                         SparkApplication app = getById(appParam.getId());
-                        app.setState(FlinkAppStateEnum.FAILED.getValue());
-                        app.setOptionState(OptionStateEnum.NONE.getValue());
+                        app.setState(SparkAppStateEnum.FAILED.getValue());
+                        
app.setOptionState(SparkOptionStateEnum.NONE.getValue());
                         updateById(app);
-                        FlinkAppHttpWatcher.unWatching(appParam.getId());
+                        SparkAppHttpWatcher.unWatching(appParam.getId());
                     }
                     return;
                 }
 
                 // 3) success
                 applicationLog.setSuccess(true);
+                // TODO:修改为spark对应的参数
                 if (response.sparkConfig() != null) {
                     String jmMemory = 
response.sparkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
                     if (jmMemory != null) {
@@ -438,21 +374,21 @@ public class SparkApplicationActionServiceImpl
                     }
                 }
                 application.setAppId(response.clusterId());
-                if (StringUtils.isNoneEmpty(response.jobId())) {
-                    application.setJobId(response.jobId());
+                if (StringUtils.isNoneEmpty(response.sparkAppId())) {
+                    application.setJobId(response.sparkAppId());
                 }
 
                 if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
                     application.setJobManagerUrl(response.jobManagerUrl());
-                    applicationLog.setJobManagerUrl(response.jobManagerUrl());
+                    applicationLog.setTrackUrl(response.jobManagerUrl());
                 }
-                applicationLog.setYarnAppId(response.clusterId());
+                applicationLog.setSparkAppId(response.sparkAppId());
                 application.setStartTime(new Date());
                 application.setEndTime(null);
 
                 // if start completed, will be added task to tracking queue
-                FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionStateEnum.STARTING);
-                // FlinkAppHttpWatcher.doWatching(application);
+                SparkAppHttpWatcher.setOptionState(appParam.getId(), 
SparkOptionStateEnum.STARTING);
+                SparkAppHttpWatcher.doWatching(application);
 
                 // update app
                 updateById(application);
@@ -487,7 +423,7 @@ public class SparkApplicationActionServiceImpl
     }
 
     private void starting(SparkApplication application) {
-        application.setState(FlinkAppStateEnum.STARTING.getValue());
+        application.setState(SparkAppStateEnum.STARTING.getValue());
         application.setOptionTime(new Date());
         updateById(application);
     }
@@ -508,8 +444,7 @@ public class SparkApplicationActionServiceImpl
                 FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), false);
                 AssertUtils.notNull(flinkSql);
                 // 1) dist_userJar
-                // todo
-                String sqlDistJar = serviceHelper.getFlinkSqlClientJar(null);
+                String sqlDistJar = 
serviceHelper.getSparkSqlClientJar(sparkEnv);
                 // 2) appConfig
                 appConf = applicationConfig == null
                     ? null
@@ -598,18 +533,7 @@ public class SparkApplicationActionServiceImpl
 
     private Map<String, Object> getProperties(SparkApplication application) {
         Map<String, Object> properties = new 
HashMap<>(application.getOptionMap());
-        if 
(SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
-            FlinkCluster cluster = 
flinkClusterService.getById(application.getSparkClusterId());
-            ApiAlertException.throwIfNull(
-                cluster,
-                String.format(
-                    "The clusterId=%s can't be find, maybe the clusterId is 
wrong or "
-                        + "the cluster has been deleted. Please contact the 
Admin.",
-                    application.getSparkClusterId()));
-            URI activeAddress = cluster.getRemoteURI();
-            properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
-            properties.put(RestOptions.PORT.key(), activeAddress.getPort());
-        } else if 
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
+        if 
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
             String yarnQueue = (String) 
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
             String yarnLabelExpr = (String) 
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
             Optional.ofNullable(yarnQueue)
@@ -618,29 +542,19 @@ public class SparkApplicationActionServiceImpl
                 .ifPresent(yLabel -> 
properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel));
         }
 
-        if (application.getAllowNonRestored()) {
-            
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), 
true);
-        }
-
         Map<String, String> dynamicProperties = PropertiesUtils
             
.extractDynamicPropertiesAsJava(application.getDynamicProperties());
         properties.putAll(dynamicProperties);
-        ResolveOrder resolveOrder = 
ResolveOrder.of(application.getResolveOrder());
-        if (resolveOrder != null) {
-            properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), 
resolveOrder.getName());
-        }
-
         return properties;
     }
 
     private void doStopped(Long id) {
         SparkApplication application = getById(id);
-        application.setOptionState(OptionStateEnum.NONE.getValue());
-        application.setState(FlinkAppStateEnum.CANCELED.getValue());
+        application.setOptionState(SparkOptionStateEnum.NONE.getValue());
+        application.setState(SparkAppStateEnum.KILLED.getValue());
         application.setOptionTime(new Date());
         updateById(application);
-        // re-tracking flink job on kubernetes and logging exception
-        FlinkAppHttpWatcher.unWatching(application.getId());
+        SparkAppHttpWatcher.unWatching(application.getId());
         // kill application
         if 
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
             try {
@@ -655,18 +569,14 @@ public class SparkApplicationActionServiceImpl
         }
     }
 
-    /* check flink cluster before job start job */
-    private void checkBeforeStart(SparkApplication application) {
-        SparkEnv sparkEnv = sparkEnvService.getByAppId(application.getId());
-        ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found 
flink version");
-
+    /* check yarn cluster before job start job */
+    private void checkYarnBeforeStart(SparkApplication application) {
+        STATE yarnState = HadoopUtils.yarnClient().getServiceState();
         ApiAlertException.throwIfFalse(
-            flinkClusterService.existsByFlinkEnvId(sparkEnv.getId()),
-            "[StreamPark] The flink cluster don't exist, please check it");
-
-        FlinkCluster flinkCluster = 
flinkClusterService.getById(application.getSparkClusterId());
-        ApiAlertException.throwIfFalse(
-            flinkClusterWatcher.getClusterState(flinkCluster) == 
ClusterState.RUNNING,
-            "[StreamPark] The flink cluster not running, please start it");
+            yarnState == STARTED,
+            "[StreamPark] The yarn cluster service state is " + 
yarnState.name() + ", please check it");
+        ApiAlertException.throwIfTrue(
+            
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
+            "[StreamPark] The same task name is already running in the yarn 
queue");
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
index 58a5bb125..4d4aba24e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
@@ -26,27 +26,19 @@ import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.base.exception.ApplicationException;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.entity.SparkEnv;
 import org.apache.streampark.console.core.enums.AppExistsStateEnum;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
-import org.apache.streampark.console.core.metrics.flink.JobsOverview;
 import org.apache.streampark.console.core.runner.EnvInitializer;
-import org.apache.streampark.console.core.service.FlinkClusterService;
-import org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
-import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
+import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
 import org.apache.streampark.flink.core.conf.ParameterCli;
-import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -96,94 +88,16 @@ public class SparkApplicationInfoServiceImpl
     @Autowired
     private SparkEnvService sparkEnvService;
 
-    @Autowired
-    private SavePointService savePointService;
-
     @Autowired
     private EnvInitializer envInitializer;
 
-    @Autowired
-    private FlinkK8sWatcher k8SFlinkTrackMonitor;
-
-    @Autowired
-    private FlinkClusterService flinkClusterService;
-
-    @Autowired
-    private FlinkClusterWatcher flinkClusterWatcher;
-
     @Override
     public Map<String, Serializable> getDashboardDataMap(Long teamId) {
-        JobsOverview.Task overview = new JobsOverview.Task();
-        Integer totalJmMemory = 0;
-        Integer totalTmMemory = 0;
-        Integer totalTm = 0;
-        Integer totalSlot = 0;
-        Integer availableSlot = 0;
-        Integer runningJob = 0;
-
-        // stat metrics from other than kubernetes mode
-        for (Application app : FlinkAppHttpWatcher.getWatchingApps()) {
-            if (!teamId.equals(app.getTeamId())) {
-                continue;
-            }
-            if (app.getJmMemory() != null) {
-                totalJmMemory += app.getJmMemory();
-            }
-            if (app.getTmMemory() != null) {
-                totalTmMemory += app.getTmMemory() * (app.getTotalTM() == null 
? 1 : app.getTotalTM());
-            }
-            if (app.getTotalTM() != null) {
-                totalTm += app.getTotalTM();
-            }
-            if (app.getTotalSlot() != null) {
-                totalSlot += app.getTotalSlot();
-            }
-            if (app.getAvailableSlot() != null) {
-                availableSlot += app.getAvailableSlot();
-            }
-            if (app.getState() == FlinkAppStateEnum.RUNNING.getValue()) {
-                runningJob++;
-            }
-            JobsOverview.Task task = app.getOverview();
-            if (task != null) {
-                overview.setTotal(overview.getTotal() + task.getTotal());
-                overview.setCreated(overview.getCreated() + task.getCreated());
-                overview.setScheduled(overview.getScheduled() + 
task.getScheduled());
-                overview.setDeploying(overview.getDeploying() + 
task.getDeploying());
-                overview.setRunning(overview.getRunning() + task.getRunning());
-                overview.setFinished(overview.getFinished() + 
task.getFinished());
-                overview.setCanceling(overview.getCanceling() + 
task.getCanceling());
-                overview.setCanceled(overview.getCanceled() + 
task.getCanceled());
-                overview.setFailed(overview.getFailed() + task.getFailed());
-                overview.setReconciling(overview.getReconciling() + 
task.getReconciling());
-            }
-        }
-
-        // merge metrics from flink kubernetes cluster
-        FlinkMetricCV k8sMetric = 
k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
-        if (k8sMetric != null) {
-            totalJmMemory += k8sMetric.totalJmMemory();
-            totalTmMemory += k8sMetric.totalTmMemory();
-            totalTm += k8sMetric.totalTm();
-            totalSlot += k8sMetric.totalSlot();
-            availableSlot += k8sMetric.availableSlot();
-            runningJob += k8sMetric.runningJob();
-            overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
-            overview.setRunning(overview.getRunning() + 
k8sMetric.runningJob());
-            overview.setFinished(overview.getFinished() + 
k8sMetric.finishedJob());
-            overview.setCanceled(overview.getCanceled() + 
k8sMetric.cancelledJob());
-            overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
-        }
 
         // result json
         Map<String, Serializable> dashboardDataMap = new HashMap<>(8);
-        dashboardDataMap.put("task", overview);
-        dashboardDataMap.put("jmMemory", totalJmMemory);
-        dashboardDataMap.put("tmMemory", totalTmMemory);
-        dashboardDataMap.put("totalTM", totalTm);
-        dashboardDataMap.put("availableSlot", availableSlot);
-        dashboardDataMap.put("totalSlot", totalSlot);
-        dashboardDataMap.put("runningJob", runningJob);
+        // TODO: Tasks running metrics for presentation
+        // dashboardDataMap.put("metrics key", "metrics value");
 
         return dashboardDataMap;
     }
@@ -203,14 +117,6 @@ public class SparkApplicationInfoServiceImpl
             }
             envInitializer.checkSparkEnv(application.getStorageType(), 
sparkEnv);
             envInitializer.storageInitialize(application.getStorageType());
-
-            if (SparkExecutionMode.REMOTE == 
application.getSparkExecutionMode()) {
-                FlinkCluster flinkCluster = 
flinkClusterService.getById(application.getSparkClusterId());
-                boolean conned = 
flinkClusterWatcher.verifyClusterConnection(flinkCluster);
-                if (!conned) {
-                    throw new ApiAlertException("the target cluster is 
unavailable, please check!");
-                }
-            }
             return true;
         } catch (Exception e) {
             log.error(ExceptionUtils.stringifyException(e));
@@ -221,10 +127,10 @@ public class SparkApplicationInfoServiceImpl
     @Override
     public boolean checkAlter(SparkApplication appParam) {
         Long appId = appParam.getId();
-        if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()) {
+        if (SparkAppStateEnum.KILLED != appParam.getStateEnum()) {
             return false;
         }
-        long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
+        long cancelUserId = SparkAppHttpWatcher.getCanceledJobUserId(appId);
         long appUserId = appParam.getUserId();
         return cancelUserId != -1 && cancelUserId != appUserId;
     }
@@ -241,37 +147,6 @@ public class SparkApplicationInfoServiceImpl
             new 
LambdaQueryWrapper<SparkApplication>().eq(SparkApplication::getUserId, userId));
     }
 
-    @Override
-    public boolean existsRunningByClusterId(Long clusterId) {
-        return baseMapper.existsRunningJobByClusterId(clusterId)
-            || FlinkAppHttpWatcher.getWatchingApps().stream()
-                .anyMatch(
-                    application -> 
clusterId.equals(application.getFlinkClusterId())
-                        && FlinkAppStateEnum.RUNNING == application
-                            .getStateEnum());
-    }
-
-    @Override
-    public boolean existsByClusterId(Long clusterId) {
-        return baseMapper.exists(
-            new LambdaQueryWrapper<SparkApplication>()
-                .eq(SparkApplication::getSparkClusterId, clusterId));
-    }
-
-    @Override
-    public Integer countByClusterId(Long clusterId) {
-        return baseMapper
-            .selectCount(
-                new LambdaQueryWrapper<SparkApplication>()
-                    .eq(SparkApplication::getSparkClusterId, clusterId))
-            .intValue();
-    }
-
-    @Override
-    public Integer countAffectedByClusterId(Long clusterId, String dbType) {
-        return baseMapper.countAffectedByClusterId(clusterId, dbType);
-    }
-
     @Override
     public boolean existsBySparkEnvId(Long sparkEnvId) {
         return baseMapper.exists(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
index 94b9abf8a..36d63d037 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
@@ -32,11 +32,11 @@ import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.Dependency;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
-import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Message;
 import org.apache.streampark.console.core.entity.Resource;
 import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.entity.SparkEnv;
 import org.apache.streampark.console.core.enums.CandidateTypeEnum;
 import org.apache.streampark.console.core.enums.NoticeTypeEnum;
@@ -46,17 +46,17 @@ import 
org.apache.streampark.console.core.enums.ResourceTypeEnum;
 import 
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
 import org.apache.streampark.console.core.service.ApplicationConfigService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.DependencyInfo;
 import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
@@ -129,10 +129,10 @@ public class SparkAppBuildPipeServiceImpl
     private SparkApplicationInfoService applicationInfoService;
 
     @Autowired
-    private ApplicationLogService applicationLogService;
+    private SparkApplicationLogService applicationLogService;
 
     @Autowired
-    private FlinkAppHttpWatcher flinkAppHttpWatcher;
+    private SparkAppHttpWatcher sparkAppHttpWatcher;
 
     @Autowired
     private ApplicationConfigService applicationConfigService;
@@ -157,7 +157,7 @@ public class SparkAppBuildPipeServiceImpl
         checkBuildEnv(appId, forceBuild);
 
         SparkApplication app = applicationManageService.getById(appId);
-        ApplicationLog applicationLog = new ApplicationLog();
+        SparkApplicationLog applicationLog = new SparkApplicationLog();
         applicationLog.setOptionName(RELEASE.getValue());
         applicationLog.setAppId(app.getId());
         applicationLog.setOptionTime(new Date());
@@ -202,8 +202,8 @@ public class SparkAppBuildPipeServiceImpl
                     app.setRelease(ReleaseStateEnum.RELEASING.get());
                     applicationManageService.updateRelease(app);
 
-                    if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
-                        flinkAppHttpWatcher.init();
+                    if (sparkAppHttpWatcher.isWatchingApp(app.getId())) {
+                        sparkAppHttpWatcher.init();
                     }
 
                     // 1) checkEnv
@@ -331,8 +331,8 @@ public class SparkAppBuildPipeServiceImpl
                     }
                     applicationManageService.updateRelease(app);
                     applicationLogService.save(applicationLog);
-                    if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
-                        flinkAppHttpWatcher.init();
+                    if (sparkAppHttpWatcher.isWatchingApp(app.getId())) {
+                        sparkAppHttpWatcher.init();
                     }
                 }
             });
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
new file mode 100644
index 000000000..cb6e338af
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.SparkApplicationLog;
+import org.apache.streampark.console.core.mapper.SparkApplicationLogMapper;
+import org.apache.streampark.console.core.service.SparkApplicationLogService;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
+public class SparkApplicationLogServiceImpl extends 
ServiceImpl<SparkApplicationLogMapper, SparkApplicationLog>
+    implements
+        SparkApplicationLogService {
+
+    @Override
+    public IPage<SparkApplicationLog> getPage(SparkApplicationLog 
sparkApplicationLog, RestRequest request) {
+        request.setSortField("option_time");
+        Page<SparkApplicationLog> page = MybatisPager.getPage(request);
+        LambdaQueryWrapper<SparkApplicationLog> queryWrapper = new 
LambdaQueryWrapper<SparkApplicationLog>()
+            .eq(SparkApplicationLog::getAppId, sparkApplicationLog.getAppId());
+        return this.page(page, queryWrapper);
+    }
+
+    @Override
+    public void removeByAppId(Long appId) {
+        LambdaQueryWrapper<SparkApplicationLog> queryWrapper = new 
LambdaQueryWrapper<SparkApplicationLog>()
+            .eq(SparkApplicationLog::getAppId, appId);
+        this.remove(queryWrapper);
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
new file mode 100644
index 000000000..c7e447cd2
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.watcher;
+
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.base.util.Tuple2;
+import org.apache.streampark.console.base.util.Tuple3;
+import org.apache.streampark.console.core.bean.AlertTemplate;
+import org.apache.streampark.console.core.entity.SparkApplication;
+import org.apache.streampark.console.core.enums.SparkAppStateEnum;
+import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
+import org.apache.streampark.console.core.enums.StopFromEnum;
+import org.apache.streampark.console.core.metrics.spark.Job;
+import org.apache.streampark.console.core.metrics.spark.SparkExecutor;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.alert.AlertService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.hc.core5.util.Timeout;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Component
+public class SparkAppHttpWatcher {
+
+    @Autowired
+    private SparkApplicationManageService applicationManageService;
+
+    @Autowired
+    private SparkApplicationActionService applicationActionService;
+
+    @Autowired
+    private SparkApplicationInfoService applicationInfoService;
+
+    @Autowired
+    private AlertService alertService;
+
+    @Qualifier("sparkRestAPIWatchingExecutor")
+    @Autowired
+    private Executor executorService;
+
+    // track interval every 5 seconds
+    public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+
+    // option interval within 10 seconds
+    private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
+
+    private static final Timeout HTTP_TIMEOUT = Timeout.ofSeconds(5);
+
+    /**
+     * Record the status of the first tracking task, because after the task is 
started, the overview
+     * of the task will be obtained during the first tracking
+     */
+    private static final Cache<Long, Byte> STARTING_CACHE =
+        Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+
+    /** tracking task list */
+    private static final Map<Long, SparkApplication> WATCHING_APPS = new 
ConcurrentHashMap<>(0);
+
+    /**
+     * <pre>
+     * StopFrom: Recording spark application stopped by streampark or stopped 
by other actions
+     * </pre>
+     */
+    private static final Map<Long, StopFromEnum> STOP_FROM_MAP = new 
ConcurrentHashMap<>(0);
+
+    /**
+     * Task canceled tracking list, record who cancelled the tracking task 
Map<applicationId,userId>
+     */
+    private static final Map<Long, Long> CANCELLED_JOB_MAP = new 
ConcurrentHashMap<>(0);
+
+    private static final Map<Long, SparkOptionStateEnum> OPTIONING = new 
ConcurrentHashMap<>(0);
+
+    private Long lastWatchTime = 0L;
+
+    private Long lastOptionTime = 0L;
+
+    private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
+
+    @PostConstruct
+    public void init() {
+        WATCHING_APPS.clear();
+        List<SparkApplication> applications =
+            applicationManageService.list(
+                new LambdaQueryWrapper<SparkApplication>()
+                    .eq(SparkApplication::getTracking, 1)
+                    .ne(SparkApplication::getState, 
SparkAppStateEnum.LOST.getValue()));
+        applications.forEach(
+            (app) -> {
+                WATCHING_APPS.put(app.getId(), app);
+                STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
+            });
+    }
+
+    @PreDestroy
+    public void doStop() {
+        log.info(
+            "[StreamPark][SparkAppHttpWatcher] StreamPark Console will be 
shutdown, persistent application to database.");
+        WATCHING_APPS.forEach((k, v) -> 
applicationManageService.persistMetrics(v));
+    }
+
+    /**
+     * <strong>NOTE: The following conditions must be met for 
execution</strong>
+     *
+     * <p><strong>1) Program started or page operated task, such as 
start/stop, needs to return the
+     * state immediately. (the frequency of 1 second once, continued 10 
seconds (10 times))</strong>
+     *
+     * <p><strong>2) Normal information obtain, once every 5 seconds</strong>
+     */
+    @Scheduled(fixedDelay = 1000)
+    public void start() {
+        Long timeMillis = System.currentTimeMillis();
+        if (lastWatchTime == null
+            || !OPTIONING.isEmpty()
+            || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
+            || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
+            lastWatchTime = timeMillis;
+            WATCHING_APPS.forEach(this::watch);
+        }
+    }
+
+    @VisibleForTesting
+    public @Nullable SparkAppStateEnum tryQuerySparkAppState(@Nonnull Long 
appId) {
+        SparkApplication app = WATCHING_APPS.get(appId);
+        return (app == null || app.getState() == null) ? null : 
app.getStateEnum();
+    }
+
+    private void watch(Long id, SparkApplication application) {
+        executorService.execute(
+            () -> {
+                try {
+                    getStateFromYarn(application);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+    }
+
+    private StopFromEnum getAppStopFrom(Long appId) {
+        return STOP_FROM_MAP.getOrDefault(appId, StopFromEnum.NONE);
+    }
+
+    /**
+     * Query the job state from yarn and query the resource usage from spark 
when job state is RUNNING
+     *
+     * @param application spark application
+     */
+    private void getStateFromYarn(SparkApplication application) throws 
Exception {
+        SparkOptionStateEnum optionStateEnum = 
OPTIONING.get(application.getId());
+
+        // query the status from the yarn rest Api
+        YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+        if (yarnAppInfo == null) {
+            throw new RuntimeException("[StreamPark][SparkAppHttpWatcher] 
getStateFromYarn failed!");
+        } else {
+            try {
+                String state = yarnAppInfo.getApp().getState();
+                SparkAppStateEnum sparkAppStateEnum = 
SparkAppStateEnum.of(state);
+                if (SparkAppStateEnum.OTHER == sparkAppStateEnum) {
+                    return;
+                }
+                if 
(SparkAppStateEnum.isEndState(sparkAppStateEnum.getValue())) {
+                    log.info(
+                        "[StreamPark][SparkAppHttpWatcher] getStateFromYarn, 
app {} was ended, jobId is {}, state is {}",
+                        application.getId(),
+                        application.getJobId(),
+                        sparkAppStateEnum);
+                    application.setEndTime(new Date());
+                }
+                if (SparkAppStateEnum.RUNNING == sparkAppStateEnum) {
+                    Tuple3<Double, Double, Long> resourceStatus = 
getResourceStatus(application);
+                    double memoryUsed = resourceStatus.t1;
+                    double maxMemory = resourceStatus.t2;
+                    double totalCores = resourceStatus.t3;
+                    log.info(
+                        "[StreamPark][SparkAppHttpWatcher] getStateFromYarn, 
app {} was running, jobId is {}, memoryUsed: {}MB, maxMemory: {}MB, totalCores: 
{}",
+                        application.getId(),
+                        application.getJobId(),
+                        String.format("%.2f", memoryUsed),
+                        String.format("%.2f", maxMemory),
+                        totalCores);
+                    // TODO: Modify the table structure to persist the results
+                }
+                application.setState(sparkAppStateEnum.getValue());
+                cleanOptioning(optionStateEnum, application.getId());
+                doPersistMetrics(application, false);
+                if (SparkAppStateEnum.FAILED == sparkAppStateEnum
+                    || SparkAppStateEnum.LOST == sparkAppStateEnum
+                    || applicationInfoService.checkAlter(application)) {
+                    doAlert(application, sparkAppStateEnum);
+                    if (SparkAppStateEnum.FAILED == sparkAppStateEnum) {
+                        applicationActionService.start(application, true);
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("[StreamPark][SparkAppHttpWatcher] 
getStateFromYarn failed!");
+            }
+        }
+    }
+
+    /**
+     * Calculate spark task progress from Spark rest api. (proxyed by yarn) 
Only available when yarn
+     * application status is RUNNING.
+     *
+     * @param application
+     * @return task progress
+     * @throws Exception
+     */
+    private double getTasksProgress(SparkApplication application) throws 
Exception {
+        Job[] jobs = httpJobsStatus(application);
+        if (jobs.length == 0) {
+            return 0.0;
+        }
+        Optional<Tuple2<Integer, Integer>> jobsSumOption =
+            Arrays.stream(jobs)
+                .map(job -> new Tuple2<>(job.getNumCompletedTasks(), 
job.getNumTasks()))
+                .reduce((val1, val2) -> new Tuple2<>(val1.t1 + val2.t1, 
val1.t2 + val2.t2));
+        Tuple2<Integer, Integer> jobsSum = jobsSumOption.get();
+        return jobsSum.t1 * 1.0 / jobsSum.t2;
+    }
+
+    private Tuple3<Double, Double, Long> getResourceStatus(SparkApplication 
application) throws Exception {
+        SparkExecutor[] executors = httpExecutorsStatus(application);
+        if (executors.length == 0) {
+            return new Tuple3<>(0.0, 0.0, 0L);
+        }
+        SparkExecutor totalExecutor =
+            Arrays.stream(executors)
+                .reduce(
+                    (e1, e2) -> {
+                        SparkExecutor temp = new SparkExecutor();
+                        temp.setMemoryUsed(e1.getMemoryUsed() + 
e2.getMemoryUsed());
+                        temp.setMaxMemory(e1.getMaxMemory() + 
e2.getMaxMemory());
+                        temp.setTotalCores(e1.getTotalCores() + 
e2.getTotalCores());
+                        return temp;
+                    })
+                .get();
+        return new Tuple3<>(
+            totalExecutor.getMemoryUsed() * 1.0 / 1024 / 1024,
+            totalExecutor.getMaxMemory() * 1.0 / 1024 / 1024,
+            totalExecutor.getTotalCores());
+    }
+
+    private void doPersistMetrics(SparkApplication application, boolean 
stopWatch) {
+        if (SparkAppStateEnum.isEndState(application.getState())) {
+            application.setOverview(null);
+            application.setTotalTM(null);
+            application.setTotalSlot(null);
+            application.setTotalTask(null);
+            application.setAvailableSlot(null);
+            application.setJmMemory(null);
+            application.setTmMemory(null);
+            unWatching(application.getId());
+        } else if (stopWatch) {
+            unWatching(application.getId());
+        } else {
+            WATCHING_APPS.put(application.getId(), application);
+        }
+        applicationManageService.persistMetrics(application);
+    }
+
+    private void cleanOptioning(SparkOptionStateEnum optionStateEnum, Long 
key) {
+        if (optionStateEnum != null) {
+            lastOptionTime = System.currentTimeMillis();
+            OPTIONING.remove(key);
+        }
+    }
+
+    /** set current option state */
+    public static void setOptionState(Long appId, SparkOptionStateEnum state) {
+        log.info("[StreamPark][SparkAppHttpWatcher]  setOptioning");
+        OPTIONING.put(appId, state);
+        if (SparkOptionStateEnum.STOPPING == state) {
+            STOP_FROM_MAP.put(appId, StopFromEnum.STREAMPARK);
+        }
+    }
+
+    public static void doWatching(SparkApplication application) {
+        log.info(
+            "[StreamPark][SparkAppHttpWatcher] add app to tracking, appId:{}", 
application.getId());
+        WATCHING_APPS.put(application.getId(), application);
+        STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
+    }
+
+    public static void unWatching(Long appId) {
+        log.info("[StreamPark][SparkAppHttpWatcher] stop app, appId:{}", 
appId);
+        WATCHING_APPS.remove(appId);
+    }
+
+    public static void addCanceledApp(Long appId, Long userId) {
+        log.info(
+            "[StreamPark][SparkAppHttpWatcher] addCanceledApp app appId:{}, 
useId:{}", appId, userId);
+        CANCELLED_JOB_MAP.put(appId, userId);
+    }
+
+    public static Long getCanceledJobUserId(Long appId) {
+        return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) : 
CANCELLED_JOB_MAP.get(appId);
+    }
+
+    public static Collection<SparkApplication> getWatchingApps() {
+        return WATCHING_APPS.values();
+    }
+
+    private YarnAppInfo httpYarnAppInfo(SparkApplication application) throws 
Exception {
+        String reqURL = "ws/v1/cluster/apps/".concat(application.getJobId());
+        return yarnRestRequest(reqURL, YarnAppInfo.class);
+    }
+
+    private Job[] httpJobsStatus(SparkApplication application) throws 
Exception {
+        String format = "proxy/%s/api/v1/applications/%s/jobs";
+        String reqURL = String.format(format, application.getJobId(), 
application.getJobId());
+        return yarnRestRequest(reqURL, Job[].class);
+    }
+
+    private SparkExecutor[] httpExecutorsStatus(SparkApplication application) 
throws Exception {
+        // "executor" is used for active executors only.
+        // "allexecutor" is used for all executors including the dead.
+        String format = "proxy/%s/api/v1/applications/%s/executors";
+        String reqURL = String.format(format, application.getJobId(), 
application.getJobId());
+        return yarnRestRequest(reqURL, SparkExecutor[].class);
+    }
+
+    private <T> T yarnRestRequest(String url, Class<T> clazz) throws 
IOException {
+        String result = YarnUtils.restRequest(url, HTTP_TIMEOUT);
+        if (null == result) {
+            return null;
+        }
+        return JacksonUtils.read(result, clazz);
+    }
+
+    public boolean isWatchingApp(Long id) {
+        return WATCHING_APPS.containsKey(id);
+    }
+
+    /**
+     * Describes the alarming behavior under abnormal operation for jobs 
running in yarn mode.
+     *
+     * @param application spark application
+     * @param appState spark application state
+     */
+    private void doAlert(SparkApplication application, SparkAppStateEnum 
appState) {
+        AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
+        alertService.alert(application.getAlertId(), alertTemplate);
+    }
+}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
index 32d09b856..0abc5f4c9 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
@@ -32,15 +32,15 @@ object SparkClient extends Logger {
   private[this] val SUBMIT_REQUEST =
     "org.apache.streampark.spark.client.bean.SubmitRequest" -> "submit"
 
-  private[this] val CANCEL_REQUEST =
-    "org.apache.streampark.spark.client.bean.CancelRequest" -> "cancel"
+  private[this] val STOP_REQUEST =
+    "org.apache.streampark.spark.client.bean.StopRequest" -> "stop"
 
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     proxy[SubmitResponse](submitRequest, submitRequest.sparkVersion, 
SUBMIT_REQUEST)
   }
 
-  def cancel(stopRequest: CancelRequest): CancelResponse = {
-    proxy[CancelResponse](stopRequest, stopRequest.sparkVersion, 
CANCEL_REQUEST)
+  def stop(stopRequest: StopRequest): StopResponse = {
+    proxy[StopResponse](stopRequest, stopRequest.sparkVersion, STOP_REQUEST)
   }
 
   private[this] def proxy[T: ClassTag](
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
similarity index 96%
rename from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
rename to 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
index 6b8e1bfa2..4e2ab56bc 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
@@ -24,12 +24,11 @@ import javax.annotation.Nullable
 
 import java.util.{Map => JavaMap}
 
-case class CancelRequest(
+case class StopRequest(
     id: Long,
     sparkVersion: SparkVersion,
     executionMode: SparkExecutionMode,
     @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
     jobId: String,
     withDrain: Boolean,
     nativeFormat: Boolean)
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
similarity index 94%
copy from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
copy to 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
index d293947da..c8655d19b 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.spark.client.bean
 
-case class CancelResponse(savePointDir: String)
+case class StopResponse(savePointDir: String)
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
index d2582dd53..5ea75af01 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
@@ -24,5 +24,6 @@ import java.util.{Map => JavaMap}
 case class SubmitResponse(
     clusterId: String,
     sparkConfig: JavaMap[String, String],
+    var sparkAppId: String,
     @Nullable jobId: String = "",
     @Nullable jobManagerUrl: String = "")
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
similarity index 75%
rename from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
rename to 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
index d293947da..99e97d3b6 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala
@@ -15,6 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.spark.client.bean
+package org.apache.streampark.spark.client.conf
 
-case class CancelResponse(savePointDir: String)
+object SparkConfiguration {
+  val defaultParameters = Map[String, Any](
+    "spark.driver.cores" -> "1",
+    "spark.driver.memory" -> "1g",
+    "spark.executor.cores" -> "1",
+    "spark.executor.memory" -> "1g")
+
+}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index 242806fb4..5cdfb9063 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -49,13 +49,13 @@ object SparkShimsProxy extends Logger {
   private[this] lazy val SPARK_SHIMS_PREFIX = "streampark-spark-shims_spark"
 
   def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = {
-    val shimsClassLoader = getSParkShimsClassLoader(sparkVersion)
+    val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
     ClassLoaderUtils
       .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
   }
 
   def proxy[T](sparkVersion: SparkVersion, func: JavaFunc[ClassLoader, T]): T 
= {
-    val shimsClassLoader = getSParkShimsClassLoader(sparkVersion)
+    val shimsClassLoader = getSparkShimsClassLoader(sparkVersion)
     ClassLoaderUtils
       .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
   }
@@ -134,14 +134,14 @@ object SparkShimsProxy extends Logger {
       .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
   }
 
-  private[this] def getSParkShimsClassLoader(sparkVersion: SparkVersion): 
ClassLoader = {
+  private[this] def getSparkShimsClassLoader(sparkVersion: SparkVersion): 
ClassLoader = {
     logInfo(s"add spark shims urls classloader,spark version: $sparkVersion")
 
     SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate(
       s"${sparkVersion.fullVersion}", {
         // 1) spark/lib
-        val libURL = getSparkHomeLib(sparkVersion.sparkHome, "jars", 
!_.getName.startsWith("log4j"))
-        val shimsUrls = ListBuffer[URL](libURL: _*)
+        val libUrl = getSparkHomeLib(sparkVersion.sparkHome, "jars", f => 
!f.getName.startsWith("log4j") && !f.getName.startsWith("slf4j"))
+        val shimsUrls = ListBuffer[URL](libUrl: _*)
 
         // 2) add all shims jar
         addShimsUrls(
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
index 55e4fe081..437bf0ff2 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
@@ -38,12 +38,12 @@ object SparkClientEndpoint {
     }
   }
 
-  def cancel(cancelRequest: CancelRequest): CancelResponse = {
-    clients.get(cancelRequest.executionMode) match {
-      case Some(client) => client.cancel(cancelRequest)
+  def stop(stopRequest: StopRequest): StopResponse = {
+    clients.get(stopRequest.executionMode) match {
+      case Some(client) => client.stop(stopRequest)
       case _ =>
         throw new UnsupportedOperationException(
-          s"Unsupported ${cancelRequest.executionMode} cancel ")
+          s"Unsupported ${stopRequest.executionMode} stop ")
     }
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
index 8f0a00e9a..9870799c0 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
@@ -18,17 +18,19 @@
 package org.apache.streampark.spark.client.impl
 
 import org.apache.streampark.common.conf.Workspace
+import org.apache.streampark.common.enums.SparkExecutionMode
+import org.apache.streampark.common.util.HadoopUtils
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 import org.apache.streampark.spark.client.`trait`.SparkClientTrait
 import org.apache.streampark.spark.client.bean._
+import org.apache.streampark.spark.client.conf.SparkConfiguration
 
 import org.apache.commons.collections.MapUtils
+import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
 
 import java.util.concurrent.{CountDownLatch, Executors, ExecutorService}
 
-import scala.util.control.Breaks.break
-
 /** yarn application mode submit */
 object YarnApplicationClient extends SparkClientTrait {
 
@@ -36,7 +38,8 @@ object YarnApplicationClient extends SparkClientTrait {
 
   private[this] lazy val workspace = Workspace.remote
 
-  override def doCancel(cancelRequest: CancelRequest): CancelResponse = {
+  override def doStop(stopRequest: StopRequest): StopResponse = {
+    
HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId))
     null
   }
 
@@ -44,11 +47,9 @@ object YarnApplicationClient extends SparkClientTrait {
 
   override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = {
     launch(submitRequest)
-    null
-
   }
 
-  private def launch(submitRequest: SubmitRequest): Unit = {
+  private def launch(submitRequest: SubmitRequest): SubmitResponse = {
     val launcher: SparkLauncher = new SparkLauncher()
       .setSparkHome(submitRequest.sparkVersion.sparkHome)
       .setAppResource(submitRequest.buildResult
@@ -56,69 +57,61 @@ object YarnApplicationClient extends SparkClientTrait {
         .shadedJarPath)
       .setMainClass(submitRequest.appMain)
       .setMaster("yarn")
-      .setDeployMode("cluster")
+      .setDeployMode(submitRequest.executionMode match {
+        case SparkExecutionMode.YARN_CLIENT => "client"
+        case SparkExecutionMode.YARN_CLUSTER => "cluster"
+        case _ =>
+          throw new IllegalArgumentException(
+            "[StreamPark][YarnApplicationClient] Yarn mode only support 
\"client\" and \"cluster\".")
+
+      })
       .setAppName(submitRequest.appName)
-      .setConf("spark.executor.memory", "5g")
-      .setConf("spark.executor.cores", "4")
-      .setConf("spark.num.executors", "1")
       .setConf(
         "spark.yarn.jars",
         submitRequest
-          .asInstanceOf[SubmitRequest]
           .hdfsWorkspace
           .sparkLib + "/*.jar")
       .setVerbose(true)
 
+    import scala.collection.JavaConverters._
+    setDynamicProperties(launcher, submitRequest.properties.asScala.toMap)
+
+    // TODO: Adds command line arguments for the application.
+    // launcher.addAppArgs()
+
     if (MapUtils.isNotEmpty(submitRequest.extraParameter) && 
submitRequest.extraParameter
         .containsKey("sql")) {
       launcher.addAppArgs("--sql", 
submitRequest.extraParameter.get("sql").toString)
     }
 
-    logger.info("The spark task start")
+    logger.info("[StreamPark][YarnApplicationClient] The spark task start")
+    val cdlForApplicationId: CountDownLatch = new CountDownLatch(1)
 
+    var sparkAppHandle: SparkAppHandle = null
     threadPool.execute(new Runnable {
       override def run(): Unit = {
         try {
           val countDownLatch: CountDownLatch = new CountDownLatch(1)
-          val sparkAppHandle: SparkAppHandle =
-            launcher.startApplication(new SparkAppHandle.Listener() {
-              override def stateChanged(handle: SparkAppHandle): Unit = {
-                if (handle.getAppId != null) {
-                  logInfo(
-                    String.format("%s stateChanged :%s", handle.getAppId, 
handle.getState.toString))
-                } else logger.info("stateChanged :{}", 
handle.getState.toString)
-
-                if (SparkAppHandle.State.FAILED.toString == 
handle.getState.toString) {
-                  logger.error("Task run failure stateChanged :{}", 
handle.getState.toString)
+          sparkAppHandle = launcher.startApplication(new 
SparkAppHandle.Listener() {
+            override def stateChanged(handle: SparkAppHandle): Unit = {
+              if (handle.getAppId != null) {
+                if (cdlForApplicationId.getCount != 0) {
+                  cdlForApplicationId.countDown()
                 }
+                logger.info("{} stateChanged :{}", Array(handle.getAppId, 
handle.getState.toString))
+              } else logger.info("stateChanged :{}", handle.getState.toString)
 
-                if (handle.getState.isFinal) countDownLatch.countDown()
+              if (SparkAppHandle.State.FAILED.toString == 
handle.getState.toString) {
+                logger.error("Task run failure stateChanged :{}", 
handle.getState.toString)
               }
 
-              override def infoChanged(handle: SparkAppHandle): Unit = {}
-            })
-          logger.info(
-            "The task is executing, current is get application id 
before,please wait ........")
-          var applicationId: String = null
-          while ({
-            !(SparkAppHandle.State.RUNNING == sparkAppHandle.getState)
-          }) {
-            applicationId = sparkAppHandle.getAppId
-            if (applicationId != null) {
-              logInfo(
-                String.format(
-                  "handle current state is %s, appid is %s",
-                  sparkAppHandle.getState.toString,
-                  applicationId))
-              break // todo: break is not supported
-
+              if (handle.getState.isFinal) {
+                countDownLatch.countDown()
+              }
             }
-          }
-          logInfo(
-            String.format(
-              "handle current state is %s, appid is %s",
-              sparkAppHandle.getState.toString,
-              applicationId))
+
+            override def infoChanged(handle: SparkAppHandle): Unit = {}
+          })
           countDownLatch.await()
         } catch {
           case e: Exception =>
@@ -127,6 +120,23 @@ object YarnApplicationClient extends SparkClientTrait {
       }
     })
 
+    cdlForApplicationId.await()
+    logger.info(
+      "[StreamPark][YarnApplicationClient] The task is executing, handle 
current state is {}, appid is {}",
+      Array(sparkAppHandle.getState.toString, sparkAppHandle.getAppId))
+    SubmitResponse(null, null, sparkAppHandle.getAppId)
+  }
+
+  private def setDynamicProperties(sparkLauncher: SparkLauncher, properties: 
Map[String, Any]): Unit = {
+    logger.info("[StreamPark][YarnApplicationClient] Spark launcher start 
configuration.")
+    val finalProperties: Map[String, Any] = 
SparkConfiguration.defaultParameters ++ properties
+    for ((k, v) <- finalProperties) {
+      if (k.startsWith("spark.")) {
+        sparkLauncher.setConf(k, v.toString)
+      } else {
+        logger.info("[StreamPark][YarnApplicationClient] \"{}\" doesn't start 
with \"spark.\". Skip it.", k)
+      }
+    }
   }
 
 }
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index 75d6ea74d..93f32aad0 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -28,7 +28,7 @@ trait SparkClientTrait extends Logger {
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     logInfo(
       s"""
-         |--------------------------------------- spark job start 
---------------------------------------
+         |--------------------------------------- spark job start 
-----------------------------------
          |    userSparkHome    : ${submitRequest.sparkVersion.sparkHome}
          |    sparkVersion     : ${submitRequest.sparkVersion.version}
          |    appName          : ${submitRequest.appName}
@@ -57,27 +57,25 @@ trait SparkClientTrait extends Logger {
   def setConfig(submitRequest: SubmitRequest): Unit
 
   @throws[Exception]
-  def cancel(cancelRequest: CancelRequest): CancelResponse = {
+  def stop(stopRequest: StopRequest): StopResponse = {
     logInfo(
       s"""
-         |----------------------------------------- spark job cancel 
--------------------------------
-         |     userSparkHome     : ${cancelRequest.sparkVersion.sparkHome}
-         |     sparkVersion      : ${cancelRequest.sparkVersion.version}
-         |     clusterId         : ${cancelRequest.clusterId}
-         |     withDrain         : ${cancelRequest.withDrain}
-         |     nativeFormat      : ${cancelRequest.nativeFormat}
-         |     appId             : ${cancelRequest.clusterId}
-         |     jobId             : ${cancelRequest.jobId}
+         |----------------------------------------- spark job stop 
----------------------------------
+         |     userSparkHome     : ${stopRequest.sparkVersion.sparkHome}
+         |     sparkVersion      : ${stopRequest.sparkVersion.version}
+         |     withDrain         : ${stopRequest.withDrain}
+         |     nativeFormat      : ${stopRequest.nativeFormat}
+         |     jobId             : ${stopRequest.jobId}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
 
-    doCancel(cancelRequest)
+    doStop(stopRequest)
   }
 
   @throws[Exception]
   def doSubmit(submitRequest: SubmitRequest): SubmitResponse
 
   @throws[Exception]
-  def doCancel(cancelRequest: CancelRequest): CancelResponse
+  def doStop(stopRequest: StopRequest): StopResponse
 
 }

Reply via email to