This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new f45dacd1 [Feature][Seatunnel-web] Add support to create job with
single API. (#184)
f45dacd1 is described below
commit f45dacd11759bcf0fb0ad363b334d0a00af5e69a
Author: Mohammad Arshad <[email protected]>
AuthorDate: Thu Aug 15 12:37:52 2024 +0530
[Feature][Seatunnel-web] Add support to create job with single API. (#184)
---
.../org/apache/seatunnel/app/common/Result.java | 12 ++
.../seatunnel/app/controller/JobController.java | 52 ++++++++
.../app/domain/request/connector/JobMode.java | 24 ++++
.../app/domain/request/job/JobCreateReq.java | 32 +++++
.../apache/seatunnel/app/service/IJobService.java | 26 ++++
.../seatunnel/app/service/impl/JobServiceImpl.java | 111 +++++++++++++++++
.../app/service/impl/JobTaskServiceImpl.java | 7 +-
.../app/utils/GlobalExceptionHandler.java | 6 +
.../server/common/ParamValidationException.java | 23 ++++
.../server/common/SeatunnelErrorEnum.java | 2 +
.../app/controller/JobControllerWrapper.java | 49 ++++++++
.../seatunnel/app/test/JobControllerTest.java | 138 +++++++++++++++++++++
.../resources/jobs/fake_source_console_job.json | 101 +++++++++++++++
13 files changed, 582 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
index 5b03b2a4..38dca30b 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.app.common;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.apache.seatunnel.server.common.SeatunnelException;
public class Result<T> {
@@ -45,6 +46,12 @@ public class Result<T> {
this.data = null;
}
+ private Result(SeatunnelException e) {
+ this.code = e.getErrorEnum().getCode();
+ this.msg = e.getMessage();
+ this.data = null;
+ }
+
public static <T> Result<T> success() {
return new Result<>();
}
@@ -65,6 +72,11 @@ public class Result<T> {
return result;
}
+ public static <T> Result<T> getFailure(SeatunnelException e) {
+ Result<T> result = new Result<>(e);
+ return result;
+ }
+
public boolean isSuccess() {
return OK.getCode() == this.code;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
new file mode 100644
index 00000000..a6d941ea
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.service.IJobService;
+
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+
+import javax.annotation.Resource;
+
+@RestController
+@RequestMapping("/seatunnel/api/v1/job")
+public class JobController {
+
+ @Resource private IJobService jobCRUDService;
+
+ @PostMapping("/create")
+ @ApiOperation(
+ value =
+ "Create a job, In jobDAG for inputPluginId and
targetPluginId use the plugin names instead of ids.",
+ httpMethod = "POST")
+ public Result<Long> createJob(
+ @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
+ @RequestBody JobCreateReq jobCreateRequest)
+ throws JsonProcessingException {
+ return Result.success(jobCRUDService.createJob(userId,
jobCreateRequest));
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/JobMode.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/JobMode.java
new file mode 100644
index 00000000..2f18f54e
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/JobMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.connector;
+
+public enum JobMode {
+ BATCH,
+
+ STREAM;
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobCreateReq.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobCreateReq.java
new file mode 100644
index 00000000..722ba4b0
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobCreateReq.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.domain.request.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class JobCreateReq {
+ private JobConfig jobConfig;
+ private List<PluginConfig> pluginConfigs;
+ private JobDAG jobDAG;
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
new file mode 100644
index 00000000..7faa4a79
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.service;
+
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+public interface IJobService {
+
+ long createJob(int userId, JobCreateReq jobCreateRequest) throws
JsonProcessingException;
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
new file mode 100644
index 00000000..d51835bd
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.service.impl;
+
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.connector.JobMode;
+import org.apache.seatunnel.app.domain.request.job.Edge;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.request.job.JobReq;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.service.IJobConfigService;
+import org.apache.seatunnel.app.service.IJobDefinitionService;
+import org.apache.seatunnel.app.service.IJobService;
+import org.apache.seatunnel.app.service.IJobTaskService;
+import org.apache.seatunnel.server.common.CodeGenerateUtils;
+import org.apache.seatunnel.server.common.ParamValidationException;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import javax.annotation.Resource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class JobServiceImpl implements IJobService {
+
+ @Resource private IJobDefinitionService jobService;
+ @Resource private IJobTaskService jobTaskService;
+ @Resource private IJobConfigService jobConfigService;
+
+ @Override
+ @Transactional
+ public long createJob(int userId, JobCreateReq jobCreateRequest)
+ throws JsonProcessingException {
+ JobReq jobDefinition =
getJobDefinition(jobCreateRequest.getJobConfig());
+ long jobId = jobService.createJob(userId, jobDefinition);
+ List<PluginConfig> pluginConfig = jobCreateRequest.getPluginConfigs();
+ Map<String, String> pluginNameVsPluginId = new HashMap<>();
+ if (pluginConfig != null) {
+ for (PluginConfig config : pluginConfig) {
+ String pluginId =
String.valueOf(CodeGenerateUtils.getInstance().genCode());
+ config.setPluginId(pluginId);
+ jobTaskService.saveSingleTask(jobId, config);
+ pluginNameVsPluginId.put(config.getName(), pluginId);
+ }
+ }
+ jobConfigService.updateJobConfig(userId, jobId,
jobCreateRequest.getJobConfig());
+ JobDAG jobDAG = jobCreateRequest.getJobDAG();
+ // Replace the plugin name with plugin id
+ List<Edge> edges = jobDAG.getEdges();
+ for (Edge edge : edges) {
+
edge.setInputPluginId(pluginNameVsPluginId.get(edge.getInputPluginId()));
+
edge.setTargetPluginId(pluginNameVsPluginId.get(edge.getTargetPluginId()));
+ }
+ jobTaskService.saveJobDAG(jobId, jobDAG);
+ return jobId;
+ }
+
+ private JobReq getJobDefinition(JobConfig jobConfig) {
+ JobReq jobReq = new JobReq();
+ if (StringUtils.isEmpty(jobConfig.getName())) {
+ throw new
ParamValidationException(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "name");
+ }
+ jobReq.setName(jobConfig.getName());
+ if (StringUtils.isEmpty(jobConfig.getDescription())) {
+ throw new ParamValidationException(
+ SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "description");
+ }
+ jobReq.setDescription(jobConfig.getDescription());
+ String jobMode = (String) jobConfig.getEnv().get("job.mode");
+ if (StringUtils.isEmpty(jobMode)) {
+ throw new ParamValidationException(
+ SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "job.mode");
+ }
+ if (JobMode.BATCH.name().equals(jobMode)) {
+ jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
+ } else if (JobMode.STREAM.name().equals(jobMode)) {
+ jobReq.setJobType(BusinessMode.DATA_REPLICA);
+ } else {
+ throw new ParamValidationException(
+ SeatunnelErrorEnum.INVALID_PARAM,
+ "job.mode",
+ "job.mode should be either BATCH or STREAM");
+ }
+ return jobReq;
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
index b810ff92..0086cf77 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
@@ -406,6 +406,7 @@ public class JobTaskServiceImpl extends
SeatunnelBaseServiceImpl implements IJob
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_CREATE, 0);
JobTask jobTask;
JobTask old = jobTaskDao.getTask(jobVersionId,
pluginConfig.getPluginId());
+ String pluginId = pluginConfig.getPluginId();
try {
checkConfigFormat(pluginConfig.getConfig());
long id;
@@ -413,6 +414,10 @@ public class JobTaskServiceImpl extends
SeatunnelBaseServiceImpl implements IJob
id = old.getId();
} else {
id = CodeGenerateUtils.getInstance().genCode();
+ pluginId =
+ pluginId == null
+ ?
String.valueOf(CodeGenerateUtils.getInstance().genCode())
+ : pluginId;
}
String connectorType;
String transformOptionsStr = null;
@@ -429,7 +434,7 @@ public class JobTaskServiceImpl extends
SeatunnelBaseServiceImpl implements IJob
jobTask =
JobTask.builder()
.id(id)
- .pluginId(pluginConfig.getPluginId())
+ .pluginId(pluginId)
.name(pluginConfig.getName())
.type(pluginConfig.getType().name().toUpperCase())
.dataSourceId(pluginConfig.getDataSourceId())
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
index 839bd311..0dc4da7a 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.utils;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
+import org.apache.seatunnel.server.common.ParamValidationException;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
@@ -82,4 +83,9 @@ public class GlobalExceptionHandler {
private void logError(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
}
+
+ @ExceptionHandler(value = ParamValidationException.class)
+ private Result<String> paramValidationHandler(SeatunnelException e) {
+ return Result.getFailure(e);
+ }
}
diff --git
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/ParamValidationException.java
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/ParamValidationException.java
new file mode 100644
index 00000000..390248b1
--- /dev/null
+++
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/ParamValidationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.server.common;
+
+public class ParamValidationException extends SeatunnelException {
+ public ParamValidationException(SeatunnelErrorEnum e, Object... msg) {
+ super(e, msg);
+ }
+}
diff --git
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
index d6636240..dfd0cb33 100644
---
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
+++
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
@@ -126,6 +126,8 @@ public enum SeatunnelErrorEnum {
"datasource can not be delete because it used by task"),
INVALID_DATASOURCE(-70001, "Datasource [{0}] invalid", "datasource [{0}]
invalid"),
MISSING_PARAM(1777000, "param miss [{0}]", "param miss [{0}]"),
+ PARAM_CAN_NOT_BE_NULL(60018, "", "param [%s] can not be null or empty"),
+ INVALID_PARAM(60019, "", "param [%s] is invalid. %s"),
;
private final int code;
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
new file mode 100644
index 00000000..61bc2f5f
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class JobControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<Long> createJob(JobCreateReq jobCreateRequest) {
+ String requestBody = JSONUtils.toPrettyJsonString(jobCreateRequest);
+ String response = sendRequest(url("job/create"), requestBody, "POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Long>>() {});
+ }
+
+ public JobCreateReq populateJobCreateReqFromFile() {
+ String filePath =
"src/test/resources/jobs/fake_source_console_job.json";
+ String jsonContent;
+ try {
+ jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
new file mode 100644
index 00000000..860ce20e
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobControllerWrapper;
+import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
+ private static JobControllerWrapper jobControllerWrapper;
+ private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
+ private static final String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
+ jobControllerWrapper = new JobControllerWrapper();
+ jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+ }
+
+ @Test
+ public void createJobWithSingleAPI_shouldExecuteSuccessfully() {
+ String jobName = "jobWithSingleAPI" + uniqueId;
+ JobCreateReq jobCreateReq =
jobControllerWrapper.populateJobCreateReqFromFile();
+ jobCreateReq.getJobConfig().setName(jobName);
+ jobCreateReq.getJobConfig().setDescription(jobName + " description");
+ setSourceIds(jobCreateReq, "fake_source_ds1" + uniqueId, "console_ds1"
+ uniqueId);
+
+ Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(job.isSuccess());
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(job.getData());
+ assertTrue(result.isSuccess());
+ assertTrue(result.getData() > 0);
+ Result<List<JobPipelineDetailMetricsRes>> listResult =
+ JobUtils.waitForJobCompletion(result.getData());
+ assertEquals(1, listResult.getData().size());
+ assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+ assertEquals(5, listResult.getData().get(0).getReadRowCount());
+ assertEquals(5, listResult.getData().get(0).getWriteRowCount());
+ }
+
+ private void setSourceIds(
+ JobCreateReq jobCreateReq, String fsdSourceName, String
csSourceName) {
+ // Set the data source id for the plugin configs
+ String fakeSourceDataSourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName);
+ String consoleDataSourceId =
+
seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName);
+ for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+ if (pluginConfig.getName().equals("source-fake-source")) {
+
pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId));
+ } else if (pluginConfig.getName().equals("sink-console")) {
+
pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId));
+ }
+ }
+ }
+
+ @Test
+ public void createJobWithSingleAPI_ValidateInput() {
+ JobCreateReq jobCreateReq =
jobControllerWrapper.populateJobCreateReqFromFile();
+ JobConfig jobConfig = jobCreateReq.getJobConfig();
+ jobConfig.setName("");
+ Result<Long> result = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(result.isFailed());
+ assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(),
result.getCode());
+ assertEquals("param [name] can not be null or empty", result.getMsg());
+
+ String jobName = "jobValidation" + uniqueId;
+ jobConfig.setName(jobName);
+ jobConfig.setDescription(null);
+ result = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(result.isFailed());
+ assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(),
result.getCode());
+ assertEquals("param [description] can not be null or empty",
result.getMsg());
+
+ jobConfig.setDescription(jobName + " description");
+ jobConfig.getEnv().put("job.mode", "");
+ result = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(result.isFailed());
+ assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(),
result.getCode());
+ assertEquals("param [job.mode] can not be null or empty",
result.getMsg());
+
+ jobConfig.getEnv().put("job.mode", "InvalidJobMode");
+ result = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(result.isFailed());
+ assertEquals(SeatunnelErrorEnum.INVALID_PARAM.getCode(),
result.getCode());
+ assertEquals(
+ "param [job.mode] is invalid. job.mode should be either BATCH
or STREAM",
+ result.getMsg());
+
+ jobConfig.getEnv().put("job.mode", "BATCH");
+ setSourceIds(jobCreateReq, "fake_source_ds2" + uniqueId, "console_ds2"
+ uniqueId);
+ result = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(result.isSuccess());
+ assertEquals(0, result.getCode());
+ assertNotNull(result.getData());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
new file mode 100644
index 00000000..acab2f66
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
@@ -0,0 +1,101 @@
+{
+ "jobConfig": {
+ "name": "job name",
+ "description": "job description",
+ "engine": "SeaTunnel",
+ "env": {
+ "job.mode": "BATCH",
+ "job.name": "SeaTunnel_Job",
+ "jars": "",
+ "checkpoint.interval": "",
+ "checkpoint.timeout": "",
+ "read_limit.rows_per_second": "",
+ "read_limit.bytes_per_second": "",
+ "custom_parameters": ""
+ }
+ },
+ "pluginConfigs": [
+ {
+ "name": "source-fake-source",
+ "type": "SOURCE",
+ "connectorType": null,
+ "tableOption": {
+ "databases": [
+ "fake_database"
+ ],
+ "tables": [
+ "fake_table"
+ ]
+ },
+ "selectTableFields": {
+ "tableFields": [
+ "name",
+ "age"
+ ],
+ "all": true
+ },
+ "dataSourceId": 1,
+ "sceneMode": "SINGLE_TABLE",
+ "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields
{\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl
[...]
+ "outputSchema": [
+ {
+ "fields": [
+ {
+ "type": "string",
+ "name": "name",
+ "comment": null,
+ "primaryKey": null,
+ "defaultValue": null,
+ "nullable": null,
+ "properties": null,
+ "unSupport": false,
+ "outputDataType": "STRING"
+ },
+ {
+ "type": "int",
+ "name": "age",
+ "comment": null,
+ "primaryKey": null,
+ "defaultValue": null,
+ "nullable": null,
+ "properties": null,
+ "unSupport": false,
+ "outputDataType": "INT"
+ }
+ ],
+ "tableName": "fake_table",
+ "database": "fake_database"
+ }
+ ],
+ "transformOptions": {}
+ },
+ {
+ "name": "sink-console",
+ "type": "SINK",
+ "connectorType": null,
+ "tableOption": {
+ "databases": [
+ "console_fake_database"
+ ],
+ "tables": [
+ "console_fake_table"
+ ]
+ },
+ "selectTableFields": {
+ "tableFields": [],
+ "all": false
+ },
+ "dataSourceId": 2,
+ "config": "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}",
+ "transformOptions": {}
+ }
+ ],
+ "jobDAG": {
+ "edges": [
+ {
+ "inputPluginId": "source-fake-source",
+ "targetPluginId": "sink-console"
+ }
+ ]
+ }
+}