This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 3ea6cb81 [Improve][SeaTunnel-Web] Change JobMode and EngineType to
enum type to avoid hard coding (#210)
3ea6cb81 is described below
commit 3ea6cb81c16d34e244efbf65f4a306ffbb3c72bb
Author: ChunFuWu <[email protected]>
AuthorDate: Tue Sep 10 01:05:03 2024 +0800
[Improve][SeaTunnel-Web] Change JobMode and EngineType to enum type to
avoid hard coding (#210)
* [Improve][SeaTunnel-Web] Change JobMode to enum type to avoid hard coding
* [Improve][SeaTunnel-Web] Change JobMode and EngineType to enum type to
avoid hard coding
---
.../app/controller/TaskInstanceController.java | 5 +++--
.../seatunnel/app/dal/dao/IJobInstanceDao.java | 3 ++-
.../app/dal/dao/impl/JobInstanceDaoImpl.java | 3 ++-
.../seatunnel/app/dal/entity/JobInstance.java | 6 ++++--
.../seatunnel/app/dal/entity/JobVersion.java | 5 +++--
.../app/dal/mapper/JobInstanceMapper.java | 3 ++-
.../app/domain/response/engine/Engine.java | 4 +++-
.../domain/response/executor/JobExecutorRes.java | 7 ++++--
.../seatunnel/app/service/IJobMetricsService.java | 3 ++-
.../app/service/ITaskInstanceService.java | 3 ++-
.../app/service/impl/EngineServiceImpl.java | 3 ++-
.../app/service/impl/JobConfigServiceImpl.java | 7 +++---
.../app/service/impl/JobDefinitionServiceImpl.java | 6 +++---
.../app/service/impl/JobInstanceServiceImpl.java | 4 +---
.../app/service/impl/JobMetricsServiceImpl.java | 11 +++++-----
.../seatunnel/app/service/impl/JobServiceImpl.java | 25 ++++++++++++----------
.../app/service/impl/TaskInstanceServiceImpl.java | 22 +++++++------------
.../metrics/EngineMetricsExtractorFactory.java | 3 ++-
.../app/controller/JobConfigControllerWrapper.java | 8 +++----
.../controller/JobDefinitionControllerWrapper.java | 3 ++-
.../controller/TaskInstanceControllerWrapper.java | 16 +++++---------
.../app/domain/JobExecutorResDeserializer.java | 7 +++---
.../seatunnel/app/test/JobControllerTest.java | 11 ++++++----
.../app/test/JobDefinitionControllerTest.java | 13 ++++-------
24 files changed, 93 insertions(+), 88 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
index f459c2d4..bac75873 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.service.ITaskInstanceService;
import org.apache.seatunnel.app.utils.PageInfo;
+import org.apache.seatunnel.common.constants.JobMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
@@ -46,7 +47,7 @@ public class TaskInstanceController {
@RequestParam(name = "stateType", required = false) String
stateType,
@RequestParam(name = "startDate", required = false) String
startTime,
@RequestParam(name = "endDate", required = false) String endTime,
- @RequestParam("syncTaskType") String syncTaskType,
+ @RequestParam("syncTaskType") JobMode jobMode,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
return taskInstanceService.getSyncTaskInstancePaging(
@@ -56,7 +57,7 @@ public class TaskInstanceController {
stateType,
startTime,
endTime,
- syncTaskType,
+ jobMode,
pageNo,
pageSize);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
index d52951cf..ec161933 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.app.dal.dao;
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.dal.mapper.JobInstanceMapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.common.constants.JobMode;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.NonNull;
@@ -44,7 +45,7 @@ public interface IJobInstanceDao {
Date startTime,
Date endTime,
String jobDefineId,
- String jobMode);
+ JobMode jobMode);
List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
index 5600dd4d..8cd2d10b 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.dal.mapper.JobInstanceMapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.common.constants.JobMode;
import org.springframework.stereotype.Repository;
@@ -71,7 +72,7 @@ public class JobInstanceDaoImpl implements IJobInstanceDao {
Date startTime,
Date endTime,
String jobDefineName,
- String jobMode) {
+ JobMode jobMode) {
IPage<SeaTunnelJobInstanceDto> jobInstanceIPage =
jobInstanceMapper.queryJobInstanceListPaging(
page, startTime, endTime, jobDefineName, jobMode);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
index 221867d8..05419a89 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.app.dal.entity;
+import org.apache.seatunnel.app.common.EngineType;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.core.job.JobStatus;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -50,7 +52,7 @@ public class JobInstance {
private String jobConfig;
@TableField("engine_name")
- private String engineName;
+ private EngineType engineName;
@TableField("engine_version")
private String engineVersion;
@@ -74,7 +76,7 @@ public class JobInstance {
private Date endTime;
@TableField("job_type")
- private String jobType;
+ private JobMode jobType;
@TableField("error_message")
private String errorMessage;
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobVersion.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobVersion.java
index 3a8abe6e..a60629cd 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobVersion.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobVersion.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.app.dal.entity;
+import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.common.constants.JobMode;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -47,12 +48,12 @@ public class JobVersion {
/** {@link JobMode} value */
@TableField("job_mode")
- private String jobMode;
+ private JobMode jobMode;
@TableField private String env;
@TableField("engine_name")
- private String engineName;
+ private EngineType engineName;
@TableField("engine_version")
private String engineVersion;
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
index b168995c..29777153 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.dal.mapper;
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.ibatis.annotations.Param;
@@ -36,7 +37,7 @@ public interface JobInstanceMapper extends
BaseMapper<JobInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("jobDefineName") String jobDefineName,
- @Param("jobMode") String jobMode);
+ @Param("jobMode") JobMode jobMode);
JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long
jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/engine/Engine.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/engine/Engine.java
index c5ed0001..36c3b24c 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/engine/Engine.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/engine/Engine.java
@@ -16,12 +16,14 @@
*/
package org.apache.seatunnel.app.domain.response.engine;
+import org.apache.seatunnel.app.common.EngineType;
+
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Engine {
- private String name;
+ private EngineType name;
private String version;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutorRes.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutorRes.java
index 9e61a63a..865b09b0 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutorRes.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutorRes.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.app.domain.response.executor;
+import org.apache.seatunnel.app.common.EngineType;
+import org.apache.seatunnel.common.constants.JobMode;
+
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -29,7 +32,7 @@ public class JobExecutorRes {
private final String jobConfig;
/** engine name Spark/Flink/SeaTunnel */
- private final String engine;
+ private final EngineType engine;
/** The driver run mode, only spark use now, support 'client' and
'cluster' */
private final String deployMode;
@@ -37,5 +40,5 @@ public class JobExecutorRes {
/** The engine run mode, for SeaTunnel Engine only support 'local' and
null */
private final String master;
- private final String jobMode;
+ private final JobMode jobMode;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
index 95a12157..8a885d13 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.app.domain.response.metrics.JobDAG;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import org.apache.seatunnel.app.domain.response.metrics.JobSummaryMetricsRes;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -54,5 +55,5 @@ public interface IJobMetricsService {
@NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
- @NonNull String syncTaskType);
+ @NonNull JobMode jobMode);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
index b5ed7209..7ba561f7 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.utils.PageInfo;
+import org.apache.seatunnel.common.constants.JobMode;
public interface ITaskInstanceService<T> {
@@ -31,7 +32,7 @@ public interface ITaskInstanceService<T> {
String stateType,
String startTime,
String endTime,
- String syncTaskType,
+ JobMode jobMode,
Integer pageNo,
Integer pageSize);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
index cba0a92c..6fcbd69d 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/EngineServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.app.service.impl;
import org.apache.seatunnel.app.bean.engine.EngineDataType;
+import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import
org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IEngineService;
@@ -36,7 +37,7 @@ public class EngineServiceImpl extends
SeatunnelBaseServiceImpl implements IEngi
Lists.newArrayList(
// new Engine("Spark", "2.4.0"),
// new Engine("Flink", "1.13.6"),
- new Engine("SeaTunnel", "2.3.6")));
+ new Engine(EngineType.SeaTunnel,
"2.3.7")));
@Override
public List<Engine> listSupportEngines() {
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
index 0ec04f53..46b41437 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.app.service.impl;
-import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao;
import org.apache.seatunnel.app.dal.dao.IJobVersionDao;
import org.apache.seatunnel.app.dal.entity.JobDefinition;
@@ -74,7 +73,7 @@ public class JobConfigServiceImpl extends
SeatunnelBaseServiceImpl implements IJ
} catch (IOException e) {
throw new RuntimeException(e);
}
- jobConfigRes.setEngine(EngineType.valueOf(jobVersion.getEngineName()));
+ jobConfigRes.setEngine(jobVersion.getEngineName());
return jobConfigRes;
}
@@ -100,8 +99,8 @@ public class JobConfigServiceImpl extends
SeatunnelBaseServiceImpl implements IJ
JobVersion.builder()
.jobId(version.getJobId())
.id(version.getId())
- .jobMode(jobMode.name())
- .engineName(jobConfig.getEngine().name())
+ .jobMode(jobMode)
+ .engineName(jobConfig.getEngine())
.updateUserId(userId)
.env(OBJECT_MAPPER.writeValueAsString(jobConfig.getEnv()))
.build());
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
index 43e9f035..32b2aa6b 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
@@ -93,12 +93,12 @@ public class JobDefinitionServiceImpl extends
SeatunnelBaseServiceImpl
.updateUserId(userId)
.name(DEFAULT_VERSION)
.id(uuid)
- .engineName(EngineType.SeaTunnel.name())
+ .engineName(EngineType.SeaTunnel)
.engineVersion("2.3.7");
if (BusinessMode.DATA_INTEGRATION.equals(jobReq.getJobType())) {
- builder.jobMode(JobMode.BATCH.name());
+ builder.jobMode(JobMode.BATCH);
} else if (BusinessMode.DATA_REPLICA.equals(jobReq.getJobType())) {
- builder.jobMode(JobMode.STREAMING.name());
+ builder.jobMode(JobMode.STREAMING);
}
jobVersionDao.createVersion(builder.build());
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 70b1201e..5b6147a2 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -148,9 +148,7 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
jobInstance.setEngineVersion(latestVersion.getEngineVersion());
jobInstance.setJobConfig(jobConfig);
jobInstance.setCreateUserId(userId);
- if (!latestVersion.getJobMode().isEmpty()) {
- jobInstance.setJobType(latestVersion.getJobMode());
- }
+ jobInstance.setJobType(latestVersion.getJobMode());
jobInstanceDao.insert(jobInstance);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
index f424aee0..947bd07e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.seatunnel.app.service.impl;
+import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.dao.IJobInstanceHistoryDao;
import org.apache.seatunnel.app.dal.dao.IJobMetricsDao;
@@ -33,6 +34,7 @@ import
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.Constants;
@@ -108,7 +110,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
@NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
- @NonNull String syncTaskType) {
+ @NonNull JobMode jobMode) {
log.info("jobInstanceIdAndJobEngineIdMap={}",
jobInstanceIdAndJobEngineIdMap);
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY,
userId);
@@ -125,15 +127,14 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
allJobInstance.get(0).getEngineName(),
allJobInstance.get(0).getEngineVersion());
- if (syncTaskType.equals("BATCH")) {
-
+ if (JobMode.BATCH == jobMode) {
result =
getMatricsListIfTaskTypeIsBatch(
allJobInstance,
userId,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
- } else if (syncTaskType.equals("STREAMING")) {
+ } else if (JobMode.STREAMING == jobMode) {
result =
getMatricsListIfTaskTypeIsStreaming(
allJobInstance,
@@ -425,7 +426,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
}
private Map<Long, HashMap<Integer, JobMetrics>>
getAllRunningJobMetricsFromEngine(
- String engineName, String engineVersion) {
+ EngineType engineName, String engineVersion) {
Engine engine = new Engine(engineName, engineVersion);
IEngineMetricsExtractor engineMetricsExtractor =
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
index 6daef43f..7ac97cc3 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
@@ -94,20 +94,23 @@ public class JobServiceImpl implements IJobService {
SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "description");
}
jobReq.setDescription(jobConfig.getDescription());
- String jobMode = (String) jobConfig.getEnv().get("job.mode");
- if (StringUtils.isEmpty(jobMode)) {
- throw new ParamValidationException(
- SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "job.mode");
- }
- if (JobMode.BATCH.name().equals(jobMode)) {
- jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
- } else if (JobMode.STREAMING.name().equals(jobMode)) {
- jobReq.setJobType(BusinessMode.DATA_REPLICA);
- } else {
+ try {
+ JobMode jobMode = JobMode.valueOf((String)
jobConfig.getEnv().get("job.mode"));
+ if (JobMode.BATCH == jobMode) {
+ jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
+ } else if (JobMode.STREAMING == jobMode) {
+ jobReq.setJobType(BusinessMode.DATA_REPLICA);
+ } else {
+ throw new ParamValidationException(
+ SeatunnelErrorEnum.INVALID_PARAM,
+ "job.mode",
+ "job.mode should be either BATCH or STREAMING");
+ }
+ } catch (Exception e) {
throw new ParamValidationException(
SeatunnelErrorEnum.INVALID_PARAM,
"job.mode",
- "job.mode should be either BATCH or STREAM");
+ "job.mode should be either BATCH or STREAMING");
}
return jobReq;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
index 05bc071c..dd249e71 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.app.service.IJobDefinitionService;
import org.apache.seatunnel.app.service.IJobMetricsService;
import org.apache.seatunnel.app.service.ITaskInstanceService;
import org.apache.seatunnel.app.utils.PageInfo;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
@@ -73,7 +74,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
String stateType,
String startTime,
String endTime,
- String syncTaskType,
+ JobMode jobMode,
Integer pageNo,
Integer pageSize) {
Result<PageInfo<SeaTunnelJobInstanceDto>> result = new Result<>();
@@ -86,17 +87,13 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
IPage<SeaTunnelJobInstanceDto> jobInstanceIPage =
jobInstanceDao.queryJobInstanceListPaging(
- new Page<>(pageNo, pageSize),
- startDate,
- endDate,
- jobDefineName,
- syncTaskType);
+ new Page<>(pageNo, pageSize), startDate, endDate,
jobDefineName, jobMode);
List<SeaTunnelJobInstanceDto> records = jobInstanceIPage.getRecords();
if (CollectionUtils.isEmpty(records)) {
return result;
}
- populateExecutionMetricsData(userId, syncTaskType, records);
+ populateExecutionMetricsData(userId, jobMode, records);
pageInfo.setTotal((int) jobInstanceIPage.getTotal());
pageInfo.setTotalList(records);
result.setData(pageInfo);
@@ -104,10 +101,10 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
private void populateExecutionMetricsData(
- Integer userId, String syncTaskType, List<SeaTunnelJobInstanceDto>
records) {
+ Integer userId, JobMode jobMode, List<SeaTunnelJobInstanceDto>
records) {
addJobDefineNameToResult(records);
addRunningTimeToResult(records);
- jobPipelineSummaryMetrics(records, syncTaskType, userId);
+ jobPipelineSummaryMetrics(records, jobMode, userId);
}
private void addRunningTimeToResult(List<SeaTunnelJobInstanceDto> records)
{
@@ -150,7 +147,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
private void jobPipelineSummaryMetrics(
- List<SeaTunnelJobInstanceDto> records, String syncTaskType,
Integer userId) {
+ List<SeaTunnelJobInstanceDto> records, JobMode jobMode, Integer
userId) {
try {
ArrayList<Long> jobInstanceIdList = new ArrayList<>();
HashMap<Long, Long> jobInstanceIdAndJobEngineIdMap = new
HashMap<>();
@@ -165,10 +162,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
Map<Long, JobSummaryMetricsRes> jobSummaryMetrics =
jobMetricsService.getALLJobSummaryMetrics(
- userId,
- jobInstanceIdAndJobEngineIdMap,
- jobInstanceIdList,
- syncTaskType);
+ userId, jobInstanceIdAndJobEngineIdMap,
jobInstanceIdList, jobMode);
for (SeaTunnelJobInstanceDto taskInstance : records) {
if (jobSummaryMetrics.get(taskInstance.getId()) != null) {
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/EngineMetricsExtractorFactory.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/EngineMetricsExtractorFactory.java
index 40656093..a0e56931 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/EngineMetricsExtractorFactory.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/EngineMetricsExtractorFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.seatunnel.app.thirdparty.metrics;
+import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineMetricsExtractor;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
@@ -31,7 +32,7 @@ public class EngineMetricsExtractorFactory {
private final Engine engine;
public IEngineMetricsExtractor getEngineMetricsExtractor() {
- if (engine.getName().equals("SeaTunnel")) {
+ if (engine.getName() == EngineType.SeaTunnel) {
return SeaTunnelEngineMetricsExtractor.getInstance();
}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
index 55c5627c..e1835a1d 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
@@ -23,11 +23,11 @@ import
org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.response.job.JobConfigRes;
import org.apache.seatunnel.app.utils.JSONTestUtils;
import org.apache.seatunnel.app.utils.JSONUtils;
-
-import org.apache.commons.collections.map.HashedMap;
+import org.apache.seatunnel.common.constants.JobMode;
import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.HashMap;
import java.util.Map;
public class JobConfigControllerWrapper extends SeatunnelWebTestingBase {
@@ -48,8 +48,8 @@ public class JobConfigControllerWrapper extends
SeatunnelWebTestingBase {
jobConfig.setName(jobName);
jobConfig.setDescription(jobName + " description from config");
jobConfig.setEngine(EngineType.SeaTunnel);
- Map<String, Object> env = new HashedMap();
- env.put("job.mode", "BATCH");
+ Map<String, Object> env = new HashMap<>();
+ env.put("job.mode", JobMode.BATCH);
env.put("job.name", "SeaTunnel_Job");
env.put("jars", "");
env.put("checkpoint.interval", "30");
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
index 33fc3338..b303bb79 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.app.domain.response.PageInfo;
import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;
import org.apache.seatunnel.app.utils.JSONTestUtils;
import org.apache.seatunnel.app.utils.JSONUtils;
+import org.apache.seatunnel.common.constants.JobMode;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -48,7 +49,7 @@ public class JobDefinitionControllerWrapper extends
SeatunnelWebTestingBase {
}
public Result<PageInfo<JobDefinitionRes>> getJobDefinition(
- String searchName, Integer pageNo, Integer pageSize, String
jobMode) {
+ String searchName, Integer pageNo, Integer pageSize, JobMode
jobMode) {
String response =
sendRequest(
urlWithParam("job/definition?")
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
index d07a06a5..6558bff9 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.utils.JSONTestUtils;
import org.apache.seatunnel.app.utils.PageInfo;
+import org.apache.seatunnel.common.constants.JobMode;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -42,7 +43,7 @@ public class TaskInstanceControllerWrapper extends
SeatunnelWebTestingBase {
String stateType,
String startTime,
String endTime,
- String syncTaskType,
+ JobMode jobMode,
Integer pageNo,
Integer pageSize) {
String response =
@@ -59,7 +60,7 @@ public class TaskInstanceControllerWrapper extends
SeatunnelWebTestingBase {
+ "&endDate="
+ endTime
+ "&syncTaskType="
- + syncTaskType
+ + jobMode
+ "&pageNo="
+ pageNo
+ "&pageSize="
@@ -77,19 +78,12 @@ public class TaskInstanceControllerWrapper extends
SeatunnelWebTestingBase {
URLEncoder.encode(
dateFormat.format(
new Date(System.currentTimeMillis() + 1000 *
60 * 60 * 24)));
- String syncTaskType = "BATCH";
+ JobMode jobMode = JobMode.BATCH;
Integer pageNo = 1;
Integer pageSize = 10;
Result<PageInfo<SeaTunnelJobInstanceDto>> result =
getTaskInstanceList(
- jobDefineName,
- null,
- null,
- startTime,
- endTime,
- syncTaskType,
- pageNo,
- pageSize);
+ jobDefineName, null, null, startTime, endTime,
jobMode, pageNo, pageSize);
assertTrue(result.isSuccess());
if (result.getData().getTotalList().isEmpty()) {
return null;
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/JobExecutorResDeserializer.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/JobExecutorResDeserializer.java
index 5aef4554..681a263a 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/JobExecutorResDeserializer.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/JobExecutorResDeserializer.java
@@ -16,7 +16,9 @@
*/
package org.apache.seatunnel.app.domain;
+import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
+import org.apache.seatunnel.common.constants.JobMode;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,11 +37,10 @@ public class JobExecutorResDeserializer extends
JsonDeserializer<JobExecutorRes>
JsonNode node = jsonParser.getCodec().readTree(jsonParser);
Long jobInstanceId = node.get("jobInstanceId").asLong();
String jobConfig = node.get("jobConfig").asText();
- String engine = node.get("engine").asText();
+ EngineType engine = EngineType.valueOf(node.get("engine").asText());
String deployMode = node.get("deployMode").asText();
String master = node.get("master").asText();
- String jobMode = node.get("jobMode").asText();
-
+ JobMode jobMode = JobMode.valueOf(node.get("jobMode").asText());
return new JobExecutorRes(jobInstanceId, jobConfig, engine,
deployMode, master, jobMode);
}
}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
index 6b51a94c..b8725b41 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.app.domain.response.job.JobConfigRes;
import org.apache.seatunnel.app.domain.response.job.JobRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.utils.JobTestingUtils;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
@@ -99,18 +100,20 @@ public class JobControllerTest {
jobConfig.getEnv().put("job.mode", "");
result = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(result.isFailed());
- assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(),
result.getCode());
- assertEquals("param [job.mode] can not be null or empty",
result.getMsg());
+ assertEquals(SeatunnelErrorEnum.INVALID_PARAM.getCode(),
result.getCode());
+ assertEquals(
+ "param [job.mode] is invalid. job.mode should be either BATCH
or STREAMING",
+ result.getMsg());
jobConfig.getEnv().put("job.mode", "InvalidJobMode");
result = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(result.isFailed());
assertEquals(SeatunnelErrorEnum.INVALID_PARAM.getCode(),
result.getCode());
assertEquals(
- "param [job.mode] is invalid. job.mode should be either BATCH
or STREAM",
+ "param [job.mode] is invalid. job.mode should be either BATCH
or STREAMING",
result.getMsg());
- jobConfig.getEnv().put("job.mode", "BATCH");
+ jobConfig.getEnv().put("job.mode", JobMode.BATCH);
// setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId,
"console_create2" +
// uniqueId);
result = jobControllerWrapper.createJob(jobCreateReq);
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
index 8b756bf9..26046cd1 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
@@ -64,14 +64,12 @@ public class JobDefinitionControllerTest {
String job3 = "job3" + uniqueId;
long jobId = jobDefinitionControllerWrapper.createJobDefinition(job3);
Result<PageInfo<JobDefinitionRes>> result =
- jobDefinitionControllerWrapper.getJobDefinition(job3, 1, 10,
JobMode.BATCH.name());
+ jobDefinitionControllerWrapper.getJobDefinition(job3, 1, 10,
JobMode.BATCH);
assertTrue(result.isSuccess());
assertEquals(1, result.getData().getData().size());
assertEquals(jobId, result.getData().getData().get(0).getId());
- result =
- jobDefinitionControllerWrapper.getJobDefinition(
- job3, 1, 10, JobMode.STREAMING.name());
+ result = jobDefinitionControllerWrapper.getJobDefinition(job3, 1, 10,
JobMode.STREAMING);
assertTrue(result.isSuccess());
assertEquals(0, result.getData().getData().size());
@@ -85,14 +83,11 @@ public class JobDefinitionControllerTest {
assertTrue(jobDefinition.isSuccess());
jobId = jobDefinition.getData();
- result =
- jobDefinitionControllerWrapper.getJobDefinition(job31, 1, 10,
JobMode.BATCH.name());
+ result = jobDefinitionControllerWrapper.getJobDefinition(job31, 1, 10,
JobMode.BATCH);
assertTrue(result.isSuccess());
assertEquals(0, result.getData().getData().size());
- result =
- jobDefinitionControllerWrapper.getJobDefinition(
- job31, 1, 10, JobMode.STREAMING.name());
+ result = jobDefinitionControllerWrapper.getJobDefinition(job31, 1, 10,
JobMode.STREAMING);
assertTrue(result.isSuccess());
assertEquals(1, result.getData().getData().size());
assertEquals(jobId, result.getData().getData().get(0).getId());