This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 287c234b [Improve] Optimize the code, add a specific Result type.
(#121)
287c234b is described below
commit 287c234b6d11ca595e749f9bd65fb6a3ca3c8b1d
Author: FlechazoW <[email protected]>
AuthorDate: Fri Sep 15 16:36:39 2023 +0800
[Improve] Optimize the code, add a specific Result type. (#121)
---
.../app/controller/JobExecutorController.java | 9 ++++----
.../app/controller/TaskInstanceController.java | 27 ++++++++++------------
.../seatunnel/app/service/IJobExecutorService.java | 6 ++---
.../app/service/ITaskInstanceService.java | 5 ++--
.../app/service/impl/JobExecutorServiceImpl.java | 24 ++++++++-----------
.../app/service/impl/TaskInstanceServiceImpl.java | 9 ++++----
6 files changed, 36 insertions(+), 44 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
index 23230201..1a41e15b 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.app.service.IJobInstanceService;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -44,12 +43,12 @@ import java.io.IOException;
@RestController
public class JobExecutorController {
- @Autowired IJobExecutorService jobExecutorService;
+ @Resource IJobExecutorService jobExecutorService;
@Resource private IJobInstanceService jobInstanceService;
@GetMapping("/execute")
@ApiOperation(value = "Execute synchronization tasks", httpMethod = "GET")
- public Result jobExecutor(
+ public Result<Long> jobExecutor(
@ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobDefineId", required = true)
@RequestParam("jobDefineId")
Long jobDefineId) {
@@ -73,14 +72,14 @@ public class JobExecutorController {
}
@GetMapping("/pause")
- public Result jobPause(
+ public Result<Void> jobPause(
@ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
return jobExecutorService.jobPause(userId, jobInstanceId);
}
@GetMapping("/restore")
- public Result jobRestore(
+ public Result<Void> jobRestore(
@ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
return jobExecutorService.jobStore(userId, jobInstanceId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
index a56e56f1..cdcd35a2 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.app.controller;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
-import org.apache.seatunnel.app.domain.response.PageInfo;
import org.apache.seatunnel.app.service.ITaskInstanceService;
+import org.apache.seatunnel.app.utils.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
@@ -35,7 +35,7 @@ import io.swagger.annotations.ApiOperation;
@RestController
public class TaskInstanceController {
- @Autowired ITaskInstanceService taskInstanceService;
+ @Autowired ITaskInstanceService<SeaTunnelJobInstanceDto>
taskInstanceService;
@GetMapping("/jobMetrics")
@ApiOperation(value = "get the jobMetrics list ", httpMethod = "GET")
@@ -49,18 +49,15 @@ public class TaskInstanceController {
@RequestParam("syncTaskType") String syncTaskType,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
- Result<PageInfo<SeaTunnelJobInstanceDto>> result =
- taskInstanceService.getSyncTaskInstancePaging(
- userId,
- jobDefineName,
- executorName,
- stateType,
- startTime,
- endTime,
- syncTaskType,
- pageNo,
- pageSize);
-
- return result;
+ return taskInstanceService.getSyncTaskInstancePaging(
+ userId,
+ jobDefineName,
+ executorName,
+ stateType,
+ startTime,
+ endTime,
+ syncTaskType,
+ pageNo,
+ pageSize);
}
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
index 1d2b1c57..01ce17e5 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.app.common.Result;
public interface IJobExecutorService {
- public Result jobExecute(Integer userId, Long jobDefineId);
+ Result<Long> jobExecute(Integer userId, Long jobDefineId);
- public Result jobPause(Integer userId, Long jobInstanceId);
+ Result<Void> jobPause(Integer userId, Long jobInstanceId);
- public Result jobStore(Integer userId, Long jobInstanceId);
+ Result<Void> jobStore(Integer userId, Long jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
index 644a75b3..77f2d115 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
@@ -18,10 +18,11 @@
package org.apache.seatunnel.app.service;
import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.utils.PageInfo;
-public interface ITaskInstanceService {
+public interface ITaskInstanceService<T> {
- Result getSyncTaskInstancePaging(
+ Result<PageInfo<T>> getSyncTaskInstancePaging(
Integer userId,
String jobDefineName,
String executorName,
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index ac5b59c1..145e25a9 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -48,6 +48,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -60,7 +61,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
@Resource private IJobInstanceDao jobInstanceDao;
@Override
- public Result jobExecute(Integer userId, Long jobDefineId) {
+ public Result<Long> jobExecute(Integer userId, Long jobDefineId) {
JobExecutorRes executeResource =
jobInstanceService.createExecuteResource(userId, jobDefineId);
@@ -75,7 +76,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
public String writeJobConfigIntoConfFile(String jobConfig, Long
jobDefineId) {
String projectRoot = System.getProperty("user.dir");
- String filePath = projectRoot + "\\profile\\" +
Long.toString(jobDefineId) + ".conf";
+ String filePath = projectRoot + "\\profile\\" + jobDefineId + ".conf";
try {
File file = new File(filePath);
if (!file.exists()) {
@@ -90,7 +91,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
log.info("File created and content written successfully.");
} catch (IOException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
return filePath;
}
@@ -132,11 +133,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
SeaTunnelClient seaTunnelClient) {
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<JobStatus> future =
- CompletableFuture.supplyAsync(
- () -> {
- return clientJobProxy.waitForJobComplete();
- },
- executor);
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor);
try {
log.info("future.get before");
JobStatus jobStatus = future.get();
@@ -160,14 +157,14 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
}
public static String getClusterName(String testClassName) {
- // return System.getProperty("user.name") + "_" + testClassName;
return testClassName;
}
@Override
- public Result jobPause(Integer userId, Long jobInstanceId) {
+ public Result<Void> jobPause(Integer userId, Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
- if (getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId())
== "RUNNING") {
+ if (Objects.equals(
+ getJobStatusFromEngine(jobInstance,
jobInstance.getJobEngineId()), "RUNNING")) {
pauseJobInEngine(jobInstance.getJobEngineId());
}
return Result.success();
@@ -188,12 +185,11 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
}
@Override
- public Result jobStore(Integer userId, Long jobInstanceId) {
+ public Result<Void> jobStore(Integer userId, Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
String projectRoot = System.getProperty("user.dir");
- String filePath =
- projectRoot + "\\profile\\" +
Long.toString(jobInstance.getJobDefineId()) + ".conf";
+ String filePath = projectRoot + "\\profile\\" +
jobInstance.getJobDefineId() + ".conf";
SeaTunnelEngineProxy.getInstance()
.restoreJob(filePath, jobInstanceId,
Long.valueOf(jobInstance.getJobEngineId()));
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
index 4515a53a..4c7110e8 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
@@ -47,7 +47,7 @@ import java.util.Map;
@Service
@Slf4j
-public class TaskInstanceServiceImpl implements ITaskInstanceService {
+public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJobInstanceDto> {
@Autowired IJobInstanceDao jobInstanceDao;
@@ -60,7 +60,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService {
@Autowired IJobDefinitionDao jobDefinitionDao;
@Override
- public Result getSyncTaskInstancePaging(
+ public Result<PageInfo<SeaTunnelJobInstanceDto>> getSyncTaskInstancePaging(
Integer userId,
String jobDefineName,
String executorName,
@@ -71,7 +71,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService {
Integer pageNo,
Integer pageSize) {
JobDefinition jobDefinition = null;
- IPage<SeaTunnelJobInstanceDto> jobInstanceIPage = null;
+ IPage<SeaTunnelJobInstanceDto> jobInstanceIPage;
if (jobDefineName != null) {
jobDefinition = jobDefinitionDao.getJobByName(jobDefineName);
}
@@ -144,8 +144,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
try {
- Date date = dateFormat.parse(time);
- return date;
+ return dateFormat.parse(time);
} catch (Exception e) {
throw new RuntimeException(e);
}