This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 975ad4ff2 support spark task (#4102)
975ad4ff2 is described below

commit 975ad4ff233bb7228ca433b8e7305409929affe4
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sun Sep 29 01:23:48 2024 +0800

    support spark task (#4102)
---
 .../SparkTaskItem.java}                            |  43 ++-----
 .../console/core/entity/FlinkApplication.java      |   4 +-
 .../console/core/enums/DistributedTaskEnum.java    |  12 +-
 .../core/service/DistributedTaskService.java       |  18 +--
 .../impl/SparkApplicationActionServiceImpl.java    |  32 ++++-
 .../service/impl/DistributedTaskServiceImpl.java   | 137 ++++++++++++++-------
 .../console/core/watcher/FlinkAppHttpWatcher.java  |  18 +--
 .../console/core/watcher/SparkAppHttpWatcher.java  |  28 +++--
 .../core/service/DistributedTaskServiceTest.java   |  27 +++-
 9 files changed, 201 insertions(+), 118 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java
similarity index 51%
copy from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java
index e27a9ce95..c681a7f8c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java
@@ -15,45 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.enums;
+package org.apache.streampark.console.core.bean;
 
-/**
- * The DistributedTaskEnum represents the possible actions that can be 
performed on a task.
- */
-public enum DistributedTaskEnum {
-
-    /**
-     * Starts the specified application.
-     */
-    START(0),
-
-    /**
-     * Restarts the given application.
-     */
-    RESTART(1),
-
-    /**
-     * Revokes access for the given application.
-     */
-    REVOKE(2),
+import lombok.Data;
 
-    /**
-     * Cancels the given application. Throws an exception if cancellation 
fails.
-     */
-    CANCEL(3),
+import java.io.Serializable;
 
-    /**
-     * Forces the given application to stop.
-     */
-    ABORT(4);
+@Data
+public class SparkTaskItem implements Serializable {
 
-    private final int value;
+    /** appId */
+    private Long appId;
 
-    DistributedTaskEnum(int value) {
-        this.value = value;
-    }
+    private Boolean autoStart;
 
-    public int get() {
-        return this.value;
-    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index 382f71fef..3b9c86ffa 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -26,6 +26,7 @@ import org.apache.streampark.common.enums.FlinkJobType;
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.bean.Dependency;
@@ -53,7 +54,6 @@ import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -64,7 +64,7 @@ import java.util.Optional;
 @Data
 @TableName("t_flink_app")
 @Slf4j
-public class FlinkApplication implements Serializable {
+public class FlinkApplication extends BaseEntity {
 
     @TableId(type = IdType.INPUT)
     private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
index e27a9ce95..f397a64fb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
@@ -45,7 +45,17 @@ public enum DistributedTaskEnum {
     /**
      * Forces the given application to stop.
      */
-    ABORT(4);
+    ABORT(4),
+
+    /**
+     * Stop the given application.
+     */
+    STOP(5),
+
+    /**
+     * Forces the given application to stop.
+     */
+    FORCED_STOP(6);
 
     private final int value;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
index a1730f5bd..0a60c4c1d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
@@ -17,13 +17,12 @@
 
 package org.apache.streampark.console.core.service;
 
+import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
 import org.apache.streampark.console.core.entity.DistributedTask;
-import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.enums.DistributedTaskEnum;
 
 import com.baomidou.mybatisplus.extension.service.IService;
 
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -40,16 +39,9 @@ public interface DistributedTaskService extends 
IService<DistributedTask> {
 
     /**
      * This interface is responsible for polling the database to retrieve task 
records and execute the corresponding operations.
-     * @param DistributedTask DistributedTask
+     * @param distributedTask distributedTask
      */
-    void executeDistributedTask(DistributedTask DistributedTask) throws 
Exception;
-
-    /**
-     * Through this interface, the watcher obtains the list of tasks that need 
to be monitored.
-     * @param applications List<Application>
-     * @return List<Application> List of tasks that need to be monitored
-     */
-    List<FlinkApplication> getMonitoredTaskList(List<FlinkApplication> 
applications);
+    void executeDistributedTask(DistributedTask distributedTask) throws 
Exception;
 
     /**
      * This interface handles task redistribution when server nodes are added.
@@ -74,9 +66,9 @@ public interface DistributedTaskService extends 
IService<DistributedTask> {
     /**
      * Save Distributed Task.
      *
-     * @param appParam  Application
+     * @param appParam It may be one of the following values: 
FlinkApplication, SparkApplication
      * @param autoStart boolean
      * @param action It may be one of the following values: START, RESTART, 
REVOKE, CANCEL, ABORT
      */
-    public void saveDistributedTask(FlinkApplication appParam, boolean 
autoStart, DistributedTaskEnum action);
+    public void saveDistributedTask(BaseEntity appParam, boolean autoStart, 
DistributedTaskEnum action);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 4812ebbc1..40912e3be 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -39,11 +39,13 @@ import 
org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.entity.SparkEnv;
 import org.apache.streampark.console.core.entity.SparkSql;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.enums.SparkAppStateEnum;
 import org.apache.streampark.console.core.enums.SparkOperationEnum;
 import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
+import org.apache.streampark.console.core.service.DistributedTaskService;
 import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.SparkSqlService;
@@ -129,6 +131,9 @@ public class SparkApplicationActionServiceImpl
     @Autowired
     private ResourceService resourceService;
 
+    @Autowired
+    private DistributedTaskService distributedTaskService;
+
     private final Map<Long, CompletableFuture<SubmitResponse>> 
startJobFutureMap = new ConcurrentHashMap<>();
 
     private final Map<Long, CompletableFuture<CancelResponse>> 
cancelJobFutureMap = new ConcurrentHashMap<>();
@@ -136,6 +141,11 @@ public class SparkApplicationActionServiceImpl
     @Override
     public void revoke(Long appId) throws ApplicationException {
         SparkApplication application = getById(appId);
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appId)) {
+            distributedTaskService.saveDistributedTask(application, false, 
DistributedTaskEnum.REVOKE);
+            return;
+        }
         ApiAlertException.throwIfNull(
             application, String.format("The application id=%s not found, 
revoke failed.", appId));
 
@@ -161,15 +171,25 @@ public class SparkApplicationActionServiceImpl
 
     @Override
     public void restart(SparkApplication appParam) throws Exception {
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+            distributedTaskService.saveDistributedTask(appParam, false, 
DistributedTaskEnum.RESTART);
+            return;
+        }
         this.stop(appParam);
         this.start(appParam, false);
     }
 
     @Override
     public void forcedStop(Long id) {
+        SparkApplication application = this.baseMapper.selectApp(id);
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(id)) {
+            distributedTaskService.saveDistributedTask(application, false, 
DistributedTaskEnum.FORCED_STOP);
+            return;
+        }
         CompletableFuture<SubmitResponse> startFuture = 
startJobFutureMap.remove(id);
         CompletableFuture<CancelResponse> stopFuture = 
cancelJobFutureMap.remove(id);
-        SparkApplication application = this.baseMapper.selectApp(id);
         if (startFuture != null) {
             startFuture.cancel(true);
         }
@@ -183,6 +203,11 @@ public class SparkApplicationActionServiceImpl
 
     @Override
     public void stop(SparkApplication appParam) throws Exception {
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+            distributedTaskService.saveDistributedTask(appParam, false, 
DistributedTaskEnum.STOP);
+            return;
+        }
         SparkAppHttpWatcher.setOptionState(appParam.getId(), 
SparkOptionStateEnum.STOPPING);
         SparkApplication application = getById(appParam.getId());
         application.setState(SparkAppStateEnum.STOPPING.getValue());
@@ -245,6 +270,11 @@ public class SparkApplicationActionServiceImpl
 
     @Override
     public void start(SparkApplication appParam, boolean auto) throws 
Exception {
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+            distributedTaskService.saveDistributedTask(appParam, false, 
DistributedTaskEnum.START);
+            return;
+        }
         // 1) check application
         final SparkApplication application = getById(appParam.getId());
         AssertUtils.notNull(application);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
index 68fdf6b5a..dfff3440a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
@@ -17,16 +17,20 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
 import org.apache.streampark.console.base.util.ConsistentHash;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.bean.SparkTaskItem;
 import org.apache.streampark.console.core.entity.DistributedTask;
 import org.apache.streampark.console.core.entity.FlinkApplication;
+import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.enums.DistributedTaskEnum;
 import org.apache.streampark.console.core.enums.EngineTypeEnum;
 import org.apache.streampark.console.core.mapper.DistributedTaskMapper;
 import org.apache.streampark.console.core.service.DistributedTaskService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,7 +47,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
 
 @Slf4j
 @Service
@@ -57,7 +60,10 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
     private Executor taskExecutor;
 
     @Autowired
-    private FlinkApplicationActionService applicationActionService;
+    private FlinkApplicationActionService flinkApplicationActionService;
+
+    @Autowired
+    private SparkApplicationActionService sparkApplicationActionService;
 
     /**
      * Server Id
@@ -112,47 +118,59 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
 
     /**
      * This interface is responsible for polling the database to retrieve task 
records and execute the corresponding operations.
-     * @param DistributedTask DistributedTask
+     * @param distributedTask distributedTask
      */
     @Override
-    public void executeDistributedTask(DistributedTask DistributedTask) throws 
Exception {
+    public void executeDistributedTask(DistributedTask distributedTask) throws 
Exception {
         // Execute Distributed task
-        log.info("Execute Distributed task: {}", DistributedTask);
-        FlinkTaskItem flinkTaskItem = getFlinkTaskItem(DistributedTask);
-        FlinkApplication appParam = getAppByFlinkTaskItem(flinkTaskItem);
-        switch (DistributedTask.getAction()) {
-            case START:
-                applicationActionService.start(appParam, 
flinkTaskItem.getAutoStart());
-                break;
-            case RESTART:
-                applicationActionService.restart(appParam);
-                break;
-            case REVOKE:
-                applicationActionService.revoke(appParam.getId());
-                break;
-            case CANCEL:
-                applicationActionService.cancel(appParam);
-                break;
-            case ABORT:
-                applicationActionService.abort(appParam.getId());
-                break;
-            default:
-                log.error("Unsupported task: {}", DistributedTask.getAction());
+        log.info("Execute Distributed task: {}", distributedTask);
+        if (distributedTask.getEngineType() == EngineTypeEnum.FLINK) {
+            FlinkTaskItem flinkTaskItem = getFlinkTaskItem(distributedTask);
+            FlinkApplication appParam = getAppByFlinkTaskItem(flinkTaskItem);
+            switch (distributedTask.getAction()) {
+                case START:
+                    flinkApplicationActionService.start(appParam, 
flinkTaskItem.getAutoStart());
+                    break;
+                case RESTART:
+                    flinkApplicationActionService.restart(appParam);
+                    break;
+                case REVOKE:
+                    flinkApplicationActionService.revoke(appParam.getId());
+                    break;
+                case CANCEL:
+                    flinkApplicationActionService.cancel(appParam);
+                    break;
+                case ABORT:
+                    flinkApplicationActionService.abort(appParam.getId());
+                    break;
+                default:
+                    log.error("Unsupported flink task action: {}", 
distributedTask.getAction());
+            }
+        } else if (distributedTask.getEngineType() == EngineTypeEnum.SPARK) {
+            SparkTaskItem sparkTaskItem = getSparkTaskItem(distributedTask);
+            SparkApplication appParam = getAppBySparkTaskItem(sparkTaskItem);
+            switch (distributedTask.getAction()) {
+                case START:
+                    sparkApplicationActionService.start(appParam, 
sparkTaskItem.getAutoStart());
+                    break;
+                case RESTART:
+                    sparkApplicationActionService.restart(appParam);
+                    break;
+                case REVOKE:
+                    sparkApplicationActionService.revoke(appParam.getId());
+                    break;
+                case STOP:
+                    sparkApplicationActionService.stop(appParam);
+                    break;
+                case FORCED_STOP:
+                    sparkApplicationActionService.forcedStop(appParam.getId());
+                    break;
+                default:
+                    log.error("Unsupported spark task action: {}", 
distributedTask.getAction());
+            }
         }
     }
 
-    /**
-     * Through this interface, the watcher obtains the list of tasks that need 
to be monitored.
-     * @param applications List<Application>
-     * @return List<Application> List of tasks that need to be monitored
-     */
-    @Override
-    public List<FlinkApplication> getMonitoredTaskList(List<FlinkApplication> 
applications) {
-        return applications.stream()
-            .filter(application -> isLocalProcessing(application.getId()))
-            .collect(Collectors.toList());
-    }
-
     /**
      * This interface handles task redistribution when server nodes are added.
      *
@@ -192,17 +210,25 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
      * @param action It may be one of the following values: START, RESTART, 
REVOKE, CANCEL, ABORT
      */
     @Override
-    public void saveDistributedTask(FlinkApplication appParam, boolean 
autoStart, DistributedTaskEnum action) {
+    public void saveDistributedTask(BaseEntity appParam, boolean autoStart, 
DistributedTaskEnum action) {
         try {
-            DistributedTask DistributedTask = 
getDistributedTaskByApp(appParam, autoStart, action);
-            this.save(DistributedTask);
+            DistributedTask distributedTask;
+            if (appParam instanceof FlinkApplication) {
+                distributedTask = 
getDistributedTaskByFlinkApp((FlinkApplication) appParam, autoStart, action);
+            } else if (appParam instanceof SparkApplication) {
+                distributedTask = 
getDistributedTaskBySparkApp((SparkApplication) appParam, autoStart, action);
+            } else {
+                log.error("Unsupported application type: {}", 
appParam.getClass().getName());
+                return;
+            }
+            this.save(distributedTask);
         } catch (JsonProcessingException e) {
             log.error("Failed to save Distributed task: {}", e.getMessage());
         }
     }
 
-    public DistributedTask getDistributedTaskByApp(FlinkApplication appParam, 
boolean autoStart,
-                                                   DistributedTaskEnum action) 
throws JsonProcessingException {
+    public DistributedTask getDistributedTaskByFlinkApp(FlinkApplication 
appParam, boolean autoStart,
+                                                        DistributedTaskEnum 
action) throws JsonProcessingException {
         FlinkTaskItem flinkTaskItem = new FlinkTaskItem();
         flinkTaskItem.setAppId(appParam.getId());
         flinkTaskItem.setAutoStart(autoStart);
@@ -221,8 +247,25 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
         return distributedTask;
     }
 
-    public FlinkTaskItem getFlinkTaskItem(DistributedTask DistributedTask) 
throws JsonProcessingException {
-        return JacksonUtils.read(DistributedTask.getProperties(), 
FlinkTaskItem.class);
+    public DistributedTask getDistributedTaskBySparkApp(SparkApplication 
appParam, boolean autoStart,
+                                                        DistributedTaskEnum 
action) throws JsonProcessingException {
+        SparkTaskItem sparkTaskItem = new SparkTaskItem();
+        sparkTaskItem.setAppId(appParam.getId());
+        sparkTaskItem.setAutoStart(autoStart);
+
+        DistributedTask distributedTask = new DistributedTask();
+        distributedTask.setAction(action);
+        distributedTask.setEngineType(EngineTypeEnum.SPARK);
+        distributedTask.setProperties(JacksonUtils.write(sparkTaskItem));
+        return distributedTask;
+    }
+
+    public FlinkTaskItem getFlinkTaskItem(DistributedTask distributedTask) 
throws JsonProcessingException {
+        return JacksonUtils.read(distributedTask.getProperties(), 
FlinkTaskItem.class);
+    }
+
+    public SparkTaskItem getSparkTaskItem(DistributedTask distributedTask) 
throws JsonProcessingException {
+        return JacksonUtils.read(distributedTask.getProperties(), 
SparkTaskItem.class);
     }
 
     public FlinkApplication getAppByFlinkTaskItem(FlinkTaskItem flinkTaskItem) 
{
@@ -238,6 +281,12 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
         return appParam;
     }
 
+    public SparkApplication getAppBySparkTaskItem(SparkTaskItem sparkTaskItem) 
{
+        SparkApplication appParam = new SparkApplication();
+        appParam.setId(sparkTaskItem.getAppId());
+        return appParam;
+    }
+
     public long getConsistentHashSize() {
         return consistentHash.getSize();
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 7b19f853a..642e09b10 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -180,15 +180,19 @@ public class FlinkAppHttpWatcher {
     @PostConstruct
     public void init() {
         WATCHING_APPS.clear();
-        List<FlinkApplication> applications = 
distributedTaskService.getMonitoredTaskList(applicationManageService.list(
+        List<FlinkApplication> applications = applicationManageService.list(
             new LambdaQueryWrapper<FlinkApplication>()
                 .eq(FlinkApplication::getTracking, 1)
-                .notIn(FlinkApplication::getDeployMode, 
FlinkDeployMode.getKubernetesMode())));
-        applications.forEach(
-            (app) -> {
-                WATCHING_APPS.put(app.getId(), app);
-                STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
-            });
+                .notIn(FlinkApplication::getDeployMode, 
FlinkDeployMode.getKubernetesMode()))
+            .stream()
+            .filter(application -> 
distributedTaskService.isLocalProcessing(application.getId()))
+            .collect(Collectors.toList());
+
+        applications.forEach(app -> {
+            Long appId = app.getId();
+            WATCHING_APPS.put(appId, app);
+            STARTING_CACHE.put(appId, DEFAULT_FLAG_BYTE);
+        });
     }
 
     @PreDestroy
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index b2c1ed083..696864687 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -28,6 +28,7 @@ import org.apache.streampark.console.core.enums.StopFromEnum;
 import org.apache.streampark.console.core.metrics.spark.Job;
 import 
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.DistributedTaskService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
@@ -64,6 +65,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Component
@@ -84,6 +86,9 @@ public class SparkAppHttpWatcher {
     @Autowired
     private SparkEnvService sparkEnvService;
 
+    @Autowired
+    private DistributedTaskService distributedTaskService;
+
     @Autowired
     private AlertService alertService;
 
@@ -134,16 +139,19 @@ public class SparkAppHttpWatcher {
     @PostConstruct
     public void init() {
         WATCHING_APPS.clear();
-        List<SparkApplication> applications =
-            applicationManageService.list(
-                new LambdaQueryWrapper<SparkApplication>()
-                    .eq(SparkApplication::getTracking, 1)
-                    .ne(SparkApplication::getState, 
SparkAppStateEnum.LOST.getValue()));
-        applications.forEach(
-            (app) -> {
-                WATCHING_APPS.put(app.getId(), app);
-                STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
-            });
+        List<SparkApplication> applications = applicationManageService.list(
+            new LambdaQueryWrapper<SparkApplication>()
+                .eq(SparkApplication::getTracking, 1)
+                .ne(SparkApplication::getState, 
SparkAppStateEnum.LOST.getValue()))
+            .stream()
+            .filter(application -> 
distributedTaskService.isLocalProcessing(application.getId()))
+            .collect(Collectors.toList());
+
+        applications.forEach(app -> {
+            Long appId = app.getId();
+            WATCHING_APPS.put(appId, app);
+            STARTING_CACHE.put(appId, DEFAULT_FLAG_BYTE);
+        });
     }
 
     @PreDestroy
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
index 58a5a15f9..5e1b3adf8 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
@@ -18,8 +18,10 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.bean.SparkTaskItem;
 import org.apache.streampark.console.core.entity.DistributedTask;
 import org.apache.streampark.console.core.entity.FlinkApplication;
+import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.enums.DistributedTaskEnum;
 import 
org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl;
 
@@ -57,17 +59,32 @@ class DistributedTaskServiceTest {
     }
 
     @Test
-    void testGetTaskAndApp() {
+    void testFlinkTaskAndApp() {
         FlinkApplication application = new FlinkApplication();
         application.setId(0L);
         try {
-            DistributedTask DistributedTask =
-                distributionTaskService.getDistributedTaskByApp(application, 
false, DistributedTaskEnum.START);
-            FlinkTaskItem flinkTaskItem = 
distributionTaskService.getFlinkTaskItem(DistributedTask);
+            DistributedTask distributedTask =
+                
distributionTaskService.getDistributedTaskByFlinkApp(application, false, 
DistributedTaskEnum.START);
+            FlinkTaskItem flinkTaskItem = 
distributionTaskService.getFlinkTaskItem(distributedTask);
             FlinkApplication newApplication = 
distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem);
             assert (application.equals(newApplication));
         } catch (JacksonException e) {
-            log.error("testGetTaskAndApp failed:", e);
+            log.error("testFlinkTaskAndApp failed:", e);
+        }
+    }
+
+    @Test
+    void testSparkTaskAndApp() {
+        SparkApplication application = new SparkApplication();
+        application.setId(0L);
+        try {
+            DistributedTask distributedTask =
+                
distributionTaskService.getDistributedTaskBySparkApp(application, false, 
DistributedTaskEnum.START);
+            SparkTaskItem sparkTaskItem = 
distributionTaskService.getSparkTaskItem(distributedTask);
+            SparkApplication newApplication = 
distributionTaskService.getAppBySparkTaskItem(sparkTaskItem);
+            assert (application.equals(newApplication));
+        } catch (JacksonException e) {
+            log.error("testSparkTaskAndApp failed:", e);
         }
     }
 

Reply via email to