This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 975ad4ff2 support spark task (#4102)
975ad4ff2 is described below
commit 975ad4ff233bb7228ca433b8e7305409929affe4
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sun Sep 29 01:23:48 2024 +0800
support spark task (#4102)
---
.../SparkTaskItem.java} | 43 ++-----
.../console/core/entity/FlinkApplication.java | 4 +-
.../console/core/enums/DistributedTaskEnum.java | 12 +-
.../core/service/DistributedTaskService.java | 18 +--
.../impl/SparkApplicationActionServiceImpl.java | 32 ++++-
.../service/impl/DistributedTaskServiceImpl.java | 137 ++++++++++++++-------
.../console/core/watcher/FlinkAppHttpWatcher.java | 18 +--
.../console/core/watcher/SparkAppHttpWatcher.java | 28 +++--
.../core/service/DistributedTaskServiceTest.java | 27 +++-
9 files changed, 201 insertions(+), 118 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java
similarity index 51%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java
index e27a9ce95..c681a7f8c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java
@@ -15,45 +15,18 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.enums;
+package org.apache.streampark.console.core.bean;
-/**
- * The DistributedTaskEnum represents the possible actions that can be
performed on a task.
- */
-public enum DistributedTaskEnum {
-
- /**
- * Starts the specified application.
- */
- START(0),
-
- /**
- * Restarts the given application.
- */
- RESTART(1),
-
- /**
- * Revokes access for the given application.
- */
- REVOKE(2),
+import lombok.Data;
- /**
- * Cancels the given application. Throws an exception if cancellation
fails.
- */
- CANCEL(3),
+import java.io.Serializable;
- /**
- * Forces the given application to stop.
- */
- ABORT(4);
+@Data
+public class SparkTaskItem implements Serializable {
- private final int value;
+ /** appId */
+ private Long appId;
- DistributedTaskEnum(int value) {
- this.value = value;
- }
+ private Boolean autoStart;
- public int get() {
- return this.value;
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index 382f71fef..3b9c86ffa 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -26,6 +26,7 @@ import org.apache.streampark.common.enums.FlinkJobType;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
@@ -53,7 +54,6 @@ import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -64,7 +64,7 @@ import java.util.Optional;
@Data
@TableName("t_flink_app")
@Slf4j
-public class FlinkApplication implements Serializable {
+public class FlinkApplication extends BaseEntity {
@TableId(type = IdType.INPUT)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
index e27a9ce95..f397a64fb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
@@ -45,7 +45,17 @@ public enum DistributedTaskEnum {
/**
* Forces the given application to stop.
*/
- ABORT(4);
+ ABORT(4),
+
+ /**
+ * Stop the given application.
+ */
+ STOP(5),
+
+ /**
+ * Forces the given application to stop.
+ */
+ FORCED_STOP(6);
private final int value;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
index a1730f5bd..0a60c4c1d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
@@ -17,13 +17,12 @@
package org.apache.streampark.console.core.service;
+import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.core.entity.DistributedTask;
-import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import com.baomidou.mybatisplus.extension.service.IService;
-import java.util.List;
import java.util.Set;
/**
@@ -40,16 +39,9 @@ public interface DistributedTaskService extends
IService<DistributedTask> {
/**
* This interface is responsible for polling the database to retrieve task
records and execute the corresponding operations.
- * @param DistributedTask DistributedTask
+ * @param distributedTask distributedTask
*/
- void executeDistributedTask(DistributedTask DistributedTask) throws
Exception;
-
- /**
- * Through this interface, the watcher obtains the list of tasks that need
to be monitored.
- * @param applications List<Application>
- * @return List<Application> List of tasks that need to be monitored
- */
- List<FlinkApplication> getMonitoredTaskList(List<FlinkApplication>
applications);
+ void executeDistributedTask(DistributedTask distributedTask) throws
Exception;
/**
* This interface handles task redistribution when server nodes are added.
@@ -74,9 +66,9 @@ public interface DistributedTaskService extends
IService<DistributedTask> {
/**
* Save Distributed Task.
*
- * @param appParam Application
+ * @param appParam It may be one of the following values:
FlinkApplication, SparkApplication
* @param autoStart boolean
* @param action It may be one of the following values: START, RESTART,
REVOKE, CANCEL, ABORT
*/
- public void saveDistributedTask(FlinkApplication appParam, boolean
autoStart, DistributedTaskEnum action);
+ public void saveDistributedTask(BaseEntity appParam, boolean autoStart,
DistributedTaskEnum action);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 4812ebbc1..40912e3be 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -39,11 +39,13 @@ import
org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.entity.SparkSql;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.enums.SparkOperationEnum;
import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
+import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
@@ -129,6 +131,9 @@ public class SparkApplicationActionServiceImpl
@Autowired
private ResourceService resourceService;
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
private final Map<Long, CompletableFuture<SubmitResponse>>
startJobFutureMap = new ConcurrentHashMap<>();
private final Map<Long, CompletableFuture<CancelResponse>>
cancelJobFutureMap = new ConcurrentHashMap<>();
@@ -136,6 +141,11 @@ public class SparkApplicationActionServiceImpl
@Override
public void revoke(Long appId) throws ApplicationException {
SparkApplication application = getById(appId);
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appId)) {
+ distributedTaskService.saveDistributedTask(application, false,
DistributedTaskEnum.REVOKE);
+ return;
+ }
ApiAlertException.throwIfNull(
application, String.format("The application id=%s not found,
revoke failed.", appId));
@@ -161,15 +171,25 @@ public class SparkApplicationActionServiceImpl
@Override
public void restart(SparkApplication appParam) throws Exception {
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+ distributedTaskService.saveDistributedTask(appParam, false,
DistributedTaskEnum.RESTART);
+ return;
+ }
this.stop(appParam);
this.start(appParam, false);
}
@Override
public void forcedStop(Long id) {
+ SparkApplication application = this.baseMapper.selectApp(id);
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(id)) {
+ distributedTaskService.saveDistributedTask(application, false,
DistributedTaskEnum.FORCED_STOP);
+ return;
+ }
CompletableFuture<SubmitResponse> startFuture =
startJobFutureMap.remove(id);
CompletableFuture<CancelResponse> stopFuture =
cancelJobFutureMap.remove(id);
- SparkApplication application = this.baseMapper.selectApp(id);
if (startFuture != null) {
startFuture.cancel(true);
}
@@ -183,6 +203,11 @@ public class SparkApplicationActionServiceImpl
@Override
public void stop(SparkApplication appParam) throws Exception {
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+ distributedTaskService.saveDistributedTask(appParam, false,
DistributedTaskEnum.STOP);
+ return;
+ }
SparkAppHttpWatcher.setOptionState(appParam.getId(),
SparkOptionStateEnum.STOPPING);
SparkApplication application = getById(appParam.getId());
application.setState(SparkAppStateEnum.STOPPING.getValue());
@@ -245,6 +270,11 @@ public class SparkApplicationActionServiceImpl
@Override
public void start(SparkApplication appParam, boolean auto) throws
Exception {
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+ distributedTaskService.saveDistributedTask(appParam, false,
DistributedTaskEnum.START);
+ return;
+ }
// 1) check application
final SparkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
index 68fdf6b5a..dfff3440a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
@@ -17,16 +17,20 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.base.util.ConsistentHash;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.bean.SparkTaskItem;
import org.apache.streampark.console.core.entity.DistributedTask;
import org.apache.streampark.console.core.entity.FlinkApplication;
+import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import org.apache.streampark.console.core.enums.EngineTypeEnum;
import org.apache.streampark.console.core.mapper.DistributedTaskMapper;
import org.apache.streampark.console.core.service.DistributedTaskService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,7 +47,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
@Slf4j
@Service
@@ -57,7 +60,10 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
private Executor taskExecutor;
@Autowired
- private FlinkApplicationActionService applicationActionService;
+ private FlinkApplicationActionService flinkApplicationActionService;
+
+ @Autowired
+ private SparkApplicationActionService sparkApplicationActionService;
/**
* Server Id
@@ -112,47 +118,59 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
/**
* This interface is responsible for polling the database to retrieve task
records and execute the corresponding operations.
- * @param DistributedTask DistributedTask
+ * @param distributedTask distributedTask
*/
@Override
- public void executeDistributedTask(DistributedTask DistributedTask) throws
Exception {
+ public void executeDistributedTask(DistributedTask distributedTask) throws
Exception {
// Execute Distributed task
- log.info("Execute Distributed task: {}", DistributedTask);
- FlinkTaskItem flinkTaskItem = getFlinkTaskItem(DistributedTask);
- FlinkApplication appParam = getAppByFlinkTaskItem(flinkTaskItem);
- switch (DistributedTask.getAction()) {
- case START:
- applicationActionService.start(appParam,
flinkTaskItem.getAutoStart());
- break;
- case RESTART:
- applicationActionService.restart(appParam);
- break;
- case REVOKE:
- applicationActionService.revoke(appParam.getId());
- break;
- case CANCEL:
- applicationActionService.cancel(appParam);
- break;
- case ABORT:
- applicationActionService.abort(appParam.getId());
- break;
- default:
- log.error("Unsupported task: {}", DistributedTask.getAction());
+ log.info("Execute Distributed task: {}", distributedTask);
+ if (distributedTask.getEngineType() == EngineTypeEnum.FLINK) {
+ FlinkTaskItem flinkTaskItem = getFlinkTaskItem(distributedTask);
+ FlinkApplication appParam = getAppByFlinkTaskItem(flinkTaskItem);
+ switch (distributedTask.getAction()) {
+ case START:
+ flinkApplicationActionService.start(appParam,
flinkTaskItem.getAutoStart());
+ break;
+ case RESTART:
+ flinkApplicationActionService.restart(appParam);
+ break;
+ case REVOKE:
+ flinkApplicationActionService.revoke(appParam.getId());
+ break;
+ case CANCEL:
+ flinkApplicationActionService.cancel(appParam);
+ break;
+ case ABORT:
+ flinkApplicationActionService.abort(appParam.getId());
+ break;
+ default:
+ log.error("Unsupported flink task action: {}",
distributedTask.getAction());
+ }
+ } else if (distributedTask.getEngineType() == EngineTypeEnum.SPARK) {
+ SparkTaskItem sparkTaskItem = getSparkTaskItem(distributedTask);
+ SparkApplication appParam = getAppBySparkTaskItem(sparkTaskItem);
+ switch (distributedTask.getAction()) {
+ case START:
+ sparkApplicationActionService.start(appParam,
sparkTaskItem.getAutoStart());
+ break;
+ case RESTART:
+ sparkApplicationActionService.restart(appParam);
+ break;
+ case REVOKE:
+ sparkApplicationActionService.revoke(appParam.getId());
+ break;
+ case STOP:
+ sparkApplicationActionService.stop(appParam);
+ break;
+ case FORCED_STOP:
+ sparkApplicationActionService.forcedStop(appParam.getId());
+ break;
+ default:
+ log.error("Unsupported spark task action: {}",
distributedTask.getAction());
+ }
}
}
- /**
- * Through this interface, the watcher obtains the list of tasks that need
to be monitored.
- * @param applications List<Application>
- * @return List<Application> List of tasks that need to be monitored
- */
- @Override
- public List<FlinkApplication> getMonitoredTaskList(List<FlinkApplication>
applications) {
- return applications.stream()
- .filter(application -> isLocalProcessing(application.getId()))
- .collect(Collectors.toList());
- }
-
/**
* This interface handles task redistribution when server nodes are added.
*
@@ -192,17 +210,25 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
* @param action It may be one of the following values: START, RESTART,
REVOKE, CANCEL, ABORT
*/
@Override
- public void saveDistributedTask(FlinkApplication appParam, boolean
autoStart, DistributedTaskEnum action) {
+ public void saveDistributedTask(BaseEntity appParam, boolean autoStart,
DistributedTaskEnum action) {
try {
- DistributedTask DistributedTask =
getDistributedTaskByApp(appParam, autoStart, action);
- this.save(DistributedTask);
+ DistributedTask distributedTask;
+ if (appParam instanceof FlinkApplication) {
+ distributedTask =
getDistributedTaskByFlinkApp((FlinkApplication) appParam, autoStart, action);
+ } else if (appParam instanceof SparkApplication) {
+ distributedTask =
getDistributedTaskBySparkApp((SparkApplication) appParam, autoStart, action);
+ } else {
+ log.error("Unsupported application type: {}",
appParam.getClass().getName());
+ return;
+ }
+ this.save(distributedTask);
} catch (JsonProcessingException e) {
log.error("Failed to save Distributed task: {}", e.getMessage());
}
}
- public DistributedTask getDistributedTaskByApp(FlinkApplication appParam,
boolean autoStart,
- DistributedTaskEnum action)
throws JsonProcessingException {
+ public DistributedTask getDistributedTaskByFlinkApp(FlinkApplication
appParam, boolean autoStart,
+ DistributedTaskEnum
action) throws JsonProcessingException {
FlinkTaskItem flinkTaskItem = new FlinkTaskItem();
flinkTaskItem.setAppId(appParam.getId());
flinkTaskItem.setAutoStart(autoStart);
@@ -221,8 +247,25 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
return distributedTask;
}
- public FlinkTaskItem getFlinkTaskItem(DistributedTask DistributedTask)
throws JsonProcessingException {
- return JacksonUtils.read(DistributedTask.getProperties(),
FlinkTaskItem.class);
+ public DistributedTask getDistributedTaskBySparkApp(SparkApplication
appParam, boolean autoStart,
+ DistributedTaskEnum
action) throws JsonProcessingException {
+ SparkTaskItem sparkTaskItem = new SparkTaskItem();
+ sparkTaskItem.setAppId(appParam.getId());
+ sparkTaskItem.setAutoStart(autoStart);
+
+ DistributedTask distributedTask = new DistributedTask();
+ distributedTask.setAction(action);
+ distributedTask.setEngineType(EngineTypeEnum.SPARK);
+ distributedTask.setProperties(JacksonUtils.write(sparkTaskItem));
+ return distributedTask;
+ }
+
+ public FlinkTaskItem getFlinkTaskItem(DistributedTask distributedTask)
throws JsonProcessingException {
+ return JacksonUtils.read(distributedTask.getProperties(),
FlinkTaskItem.class);
+ }
+
+ public SparkTaskItem getSparkTaskItem(DistributedTask distributedTask)
throws JsonProcessingException {
+ return JacksonUtils.read(distributedTask.getProperties(),
SparkTaskItem.class);
}
public FlinkApplication getAppByFlinkTaskItem(FlinkTaskItem flinkTaskItem)
{
@@ -238,6 +281,12 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
return appParam;
}
+ public SparkApplication getAppBySparkTaskItem(SparkTaskItem sparkTaskItem)
{
+ SparkApplication appParam = new SparkApplication();
+ appParam.setId(sparkTaskItem.getAppId());
+ return appParam;
+ }
+
public long getConsistentHashSize() {
return consistentHash.getSize();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 7b19f853a..642e09b10 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -180,15 +180,19 @@ public class FlinkAppHttpWatcher {
@PostConstruct
public void init() {
WATCHING_APPS.clear();
- List<FlinkApplication> applications =
distributedTaskService.getMonitoredTaskList(applicationManageService.list(
+ List<FlinkApplication> applications = applicationManageService.list(
new LambdaQueryWrapper<FlinkApplication>()
.eq(FlinkApplication::getTracking, 1)
- .notIn(FlinkApplication::getDeployMode,
FlinkDeployMode.getKubernetesMode())));
- applications.forEach(
- (app) -> {
- WATCHING_APPS.put(app.getId(), app);
- STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
- });
+ .notIn(FlinkApplication::getDeployMode,
FlinkDeployMode.getKubernetesMode()))
+ .stream()
+ .filter(application ->
distributedTaskService.isLocalProcessing(application.getId()))
+ .collect(Collectors.toList());
+
+ applications.forEach(app -> {
+ Long appId = app.getId();
+ WATCHING_APPS.put(appId, app);
+ STARTING_CACHE.put(appId, DEFAULT_FLAG_BYTE);
+ });
}
@PreDestroy
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index b2c1ed083..696864687 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -28,6 +28,7 @@ import org.apache.streampark.console.core.enums.StopFromEnum;
import org.apache.streampark.console.core.metrics.spark.Job;
import
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
@@ -64,6 +65,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Slf4j
@Component
@@ -84,6 +86,9 @@ public class SparkAppHttpWatcher {
@Autowired
private SparkEnvService sparkEnvService;
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
@Autowired
private AlertService alertService;
@@ -134,16 +139,19 @@ public class SparkAppHttpWatcher {
@PostConstruct
public void init() {
WATCHING_APPS.clear();
- List<SparkApplication> applications =
- applicationManageService.list(
- new LambdaQueryWrapper<SparkApplication>()
- .eq(SparkApplication::getTracking, 1)
- .ne(SparkApplication::getState,
SparkAppStateEnum.LOST.getValue()));
- applications.forEach(
- (app) -> {
- WATCHING_APPS.put(app.getId(), app);
- STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
- });
+ List<SparkApplication> applications = applicationManageService.list(
+ new LambdaQueryWrapper<SparkApplication>()
+ .eq(SparkApplication::getTracking, 1)
+ .ne(SparkApplication::getState,
SparkAppStateEnum.LOST.getValue()))
+ .stream()
+ .filter(application ->
distributedTaskService.isLocalProcessing(application.getId()))
+ .collect(Collectors.toList());
+
+ applications.forEach(app -> {
+ Long appId = app.getId();
+ WATCHING_APPS.put(appId, app);
+ STARTING_CACHE.put(appId, DEFAULT_FLAG_BYTE);
+ });
}
@PreDestroy
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
index 58a5a15f9..5e1b3adf8 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
@@ -18,8 +18,10 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.bean.SparkTaskItem;
import org.apache.streampark.console.core.entity.DistributedTask;
import org.apache.streampark.console.core.entity.FlinkApplication;
+import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import
org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl;
@@ -57,17 +59,32 @@ class DistributedTaskServiceTest {
}
@Test
- void testGetTaskAndApp() {
+ void testFlinkTaskAndApp() {
FlinkApplication application = new FlinkApplication();
application.setId(0L);
try {
- DistributedTask DistributedTask =
- distributionTaskService.getDistributedTaskByApp(application,
false, DistributedTaskEnum.START);
- FlinkTaskItem flinkTaskItem =
distributionTaskService.getFlinkTaskItem(DistributedTask);
+ DistributedTask distributedTask =
+
distributionTaskService.getDistributedTaskByFlinkApp(application, false,
DistributedTaskEnum.START);
+ FlinkTaskItem flinkTaskItem =
distributionTaskService.getFlinkTaskItem(distributedTask);
FlinkApplication newApplication =
distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem);
assert (application.equals(newApplication));
} catch (JacksonException e) {
- log.error("testGetTaskAndApp failed:", e);
+ log.error("testFlinkTaskAndApp failed:", e);
+ }
+ }
+
+ @Test
+ void testSparkTaskAndApp() {
+ SparkApplication application = new SparkApplication();
+ application.setId(0L);
+ try {
+ DistributedTask distributedTask =
+
distributionTaskService.getDistributedTaskBySparkApp(application, false,
DistributedTaskEnum.START);
+ SparkTaskItem sparkTaskItem =
distributionTaskService.getSparkTaskItem(distributedTask);
+ SparkApplication newApplication =
distributionTaskService.getAppBySparkTaskItem(sparkTaskItem);
+ assert (application.equals(newApplication));
+ } catch (JacksonException e) {
+ log.error("testSparkTaskAndApp failed:", e);
}
}