This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new f45dacd1 [Feature][Seatunnel-web] Add support to create job with 
single API. (#184)
f45dacd1 is described below

commit f45dacd11759bcf0fb0ad363b334d0a00af5e69a
Author: Mohammad Arshad <[email protected]>
AuthorDate: Thu Aug 15 12:37:52 2024 +0530

    [Feature][Seatunnel-web] Add support to create job with single API. (#184)
---
 .../org/apache/seatunnel/app/common/Result.java    |  12 ++
 .../seatunnel/app/controller/JobController.java    |  52 ++++++++
 .../app/domain/request/connector/JobMode.java      |  24 ++++
 .../app/domain/request/job/JobCreateReq.java       |  32 +++++
 .../apache/seatunnel/app/service/IJobService.java  |  26 ++++
 .../seatunnel/app/service/impl/JobServiceImpl.java | 111 +++++++++++++++++
 .../app/service/impl/JobTaskServiceImpl.java       |   7 +-
 .../app/utils/GlobalExceptionHandler.java          |   6 +
 .../server/common/ParamValidationException.java    |  23 ++++
 .../server/common/SeatunnelErrorEnum.java          |   2 +
 .../app/controller/JobControllerWrapper.java       |  49 ++++++++
 .../seatunnel/app/test/JobControllerTest.java      | 138 +++++++++++++++++++++
 .../resources/jobs/fake_source_console_job.json    | 101 +++++++++++++++
 13 files changed, 582 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
index 5b03b2a4..38dca30b 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/Result.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.app.common;
 
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.apache.seatunnel.server.common.SeatunnelException;
 
 public class Result<T> {
 
@@ -45,6 +46,12 @@ public class Result<T> {
         this.data = null;
     }
 
+    private Result(SeatunnelException e) {
+        this.code = e.getErrorEnum().getCode();
+        this.msg = e.getMessage();
+        this.data = null;
+    }
+
     public static <T> Result<T> success() {
         return new Result<>();
     }
@@ -65,6 +72,11 @@ public class Result<T> {
         return result;
     }
 
+    public static <T> Result<T> getFailure(SeatunnelException e) {
+        Result<T> result = new Result<>(e);
+        return result;
+    }
+
     public boolean isSuccess() {
         return OK.getCode() == this.code;
     }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
new file mode 100644
index 00000000..a6d941ea
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.service.IJobService;
+
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+
+import javax.annotation.Resource;
+
+@RestController
+@RequestMapping("/seatunnel/api/v1/job")
+public class JobController {
+
+    @Resource private IJobService jobCRUDService;
+
+    @PostMapping("/create")
+    @ApiOperation(
+            value =
+                    "Create a job, In jobDAG for inputPluginId and 
targetPluginId use the plugin names instead of ids.",
+            httpMethod = "POST")
+    public Result<Long> createJob(
+            @ApiParam(value = "userId", required = true) 
@RequestAttribute("userId") Integer userId,
+            @RequestBody JobCreateReq jobCreateRequest)
+            throws JsonProcessingException {
+        return Result.success(jobCRUDService.createJob(userId, 
jobCreateRequest));
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/JobMode.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/JobMode.java
new file mode 100644
index 00000000..2f18f54e
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/JobMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.connector;
+
+public enum JobMode {
+    BATCH,
+
+    STREAM;
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobCreateReq.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobCreateReq.java
new file mode 100644
index 00000000..722ba4b0
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobCreateReq.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.domain.request.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class JobCreateReq {
+    private JobConfig jobConfig;
+    private List<PluginConfig> pluginConfigs;
+    private JobDAG jobDAG;
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
new file mode 100644
index 00000000..7faa4a79
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.service;
+
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+public interface IJobService {
+
+    long createJob(int userId, JobCreateReq jobCreateRequest) throws 
JsonProcessingException;
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
new file mode 100644
index 00000000..d51835bd
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.service.impl;
+
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.connector.JobMode;
+import org.apache.seatunnel.app.domain.request.job.Edge;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.request.job.JobReq;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.service.IJobConfigService;
+import org.apache.seatunnel.app.service.IJobDefinitionService;
+import org.apache.seatunnel.app.service.IJobService;
+import org.apache.seatunnel.app.service.IJobTaskService;
+import org.apache.seatunnel.server.common.CodeGenerateUtils;
+import org.apache.seatunnel.server.common.ParamValidationException;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import javax.annotation.Resource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class JobServiceImpl implements IJobService {
+
+    @Resource private IJobDefinitionService jobService;
+    @Resource private IJobTaskService jobTaskService;
+    @Resource private IJobConfigService jobConfigService;
+
+    @Override
+    @Transactional
+    public long createJob(int userId, JobCreateReq jobCreateRequest)
+            throws JsonProcessingException {
+        JobReq jobDefinition = 
getJobDefinition(jobCreateRequest.getJobConfig());
+        long jobId = jobService.createJob(userId, jobDefinition);
+        List<PluginConfig> pluginConfig = jobCreateRequest.getPluginConfigs();
+        Map<String, String> pluginNameVsPluginId = new HashMap<>();
+        if (pluginConfig != null) {
+            for (PluginConfig config : pluginConfig) {
+                String pluginId = 
String.valueOf(CodeGenerateUtils.getInstance().genCode());
+                config.setPluginId(pluginId);
+                jobTaskService.saveSingleTask(jobId, config);
+                pluginNameVsPluginId.put(config.getName(), pluginId);
+            }
+        }
+        jobConfigService.updateJobConfig(userId, jobId, 
jobCreateRequest.getJobConfig());
+        JobDAG jobDAG = jobCreateRequest.getJobDAG();
+        // Replace the plugin name with plugin id
+        List<Edge> edges = jobDAG.getEdges();
+        for (Edge edge : edges) {
+            
edge.setInputPluginId(pluginNameVsPluginId.get(edge.getInputPluginId()));
+            
edge.setTargetPluginId(pluginNameVsPluginId.get(edge.getTargetPluginId()));
+        }
+        jobTaskService.saveJobDAG(jobId, jobDAG);
+        return jobId;
+    }
+
+    private JobReq getJobDefinition(JobConfig jobConfig) {
+        JobReq jobReq = new JobReq();
+        if (StringUtils.isEmpty(jobConfig.getName())) {
+            throw new 
ParamValidationException(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "name");
+        }
+        jobReq.setName(jobConfig.getName());
+        if (StringUtils.isEmpty(jobConfig.getDescription())) {
+            throw new ParamValidationException(
+                    SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "description");
+        }
+        jobReq.setDescription(jobConfig.getDescription());
+        String jobMode = (String) jobConfig.getEnv().get("job.mode");
+        if (StringUtils.isEmpty(jobMode)) {
+            throw new ParamValidationException(
+                    SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "job.mode");
+        }
+        if (JobMode.BATCH.name().equals(jobMode)) {
+            jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
+        } else if (JobMode.STREAM.name().equals(jobMode)) {
+            jobReq.setJobType(BusinessMode.DATA_REPLICA);
+        } else {
+            throw new ParamValidationException(
+                    SeatunnelErrorEnum.INVALID_PARAM,
+                    "job.mode",
+                    "job.mode should be either BATCH or STREAM");
+        }
+        return jobReq;
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
index b810ff92..0086cf77 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
@@ -406,6 +406,7 @@ public class JobTaskServiceImpl extends 
SeatunnelBaseServiceImpl implements IJob
         
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_CREATE, 0);
         JobTask jobTask;
         JobTask old = jobTaskDao.getTask(jobVersionId, 
pluginConfig.getPluginId());
+        String pluginId = pluginConfig.getPluginId();
         try {
             checkConfigFormat(pluginConfig.getConfig());
             long id;
@@ -413,6 +414,10 @@ public class JobTaskServiceImpl extends 
SeatunnelBaseServiceImpl implements IJob
                 id = old.getId();
             } else {
                 id = CodeGenerateUtils.getInstance().genCode();
+                pluginId =
+                        pluginId == null
+                                ? 
String.valueOf(CodeGenerateUtils.getInstance().genCode())
+                                : pluginId;
             }
             String connectorType;
             String transformOptionsStr = null;
@@ -429,7 +434,7 @@ public class JobTaskServiceImpl extends 
SeatunnelBaseServiceImpl implements IJob
             jobTask =
                     JobTask.builder()
                             .id(id)
-                            .pluginId(pluginConfig.getPluginId())
+                            .pluginId(pluginId)
                             .name(pluginConfig.getName())
                             .type(pluginConfig.getType().name().toUpperCase())
                             .dataSourceId(pluginConfig.getDataSourceId())
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
index 839bd311..0dc4da7a 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/GlobalExceptionHandler.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.utils;
 
 import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
+import org.apache.seatunnel.server.common.ParamValidationException;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 import org.apache.seatunnel.server.common.SeatunnelException;
 
@@ -82,4 +83,9 @@ public class GlobalExceptionHandler {
     private void logError(Throwable throwable) {
         log.error(throwable.getMessage(), throwable);
     }
+
+    @ExceptionHandler(value = ParamValidationException.class)
+    private Result<String> paramValidationHandler(SeatunnelException e) {
+        return Result.getFailure(e);
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/ParamValidationException.java
 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/ParamValidationException.java
new file mode 100644
index 00000000..390248b1
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/ParamValidationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.server.common;
+
+public class ParamValidationException extends SeatunnelException {
+    public ParamValidationException(SeatunnelErrorEnum e, Object... msg) {
+        super(e, msg);
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
index d6636240..dfd0cb33 100644
--- 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
+++ 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
@@ -126,6 +126,8 @@ public enum SeatunnelErrorEnum {
             "datasource can not be delete because it used by task"),
     INVALID_DATASOURCE(-70001, "Datasource [{0}] invalid", "datasource [{0}] 
invalid"),
     MISSING_PARAM(1777000, "param miss [{0}]", "param miss [{0}]"),
+    PARAM_CAN_NOT_BE_NULL(60018, "", "param [%s] can not be null or empty"),
+    INVALID_PARAM(60019, "", "param [%s] is invalid. %s"),
     ;
 
     private final int code;
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
new file mode 100644
index 00000000..61bc2f5f
--- /dev/null
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class JobControllerWrapper extends SeatunnelWebTestingBase {
+
+    public Result<Long> createJob(JobCreateReq jobCreateRequest) {
+        String requestBody = JSONUtils.toPrettyJsonString(jobCreateRequest);
+        String response = sendRequest(url("job/create"), requestBody, "POST");
+        return JSONTestUtils.parseObject(response, new 
TypeReference<Result<Long>>() {});
+    }
+
+    public JobCreateReq populateJobCreateReqFromFile() {
+        String filePath = 
"src/test/resources/jobs/fake_source_console_job.json";
+        String jsonContent;
+        try {
+            jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
+    }
+}
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
new file mode 100644
index 00000000..860ce20e
--- /dev/null
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobControllerWrapper;
+import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import 
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobControllerTest {
+    private static final SeaTunnelWebCluster seaTunnelWebCluster = new 
SeaTunnelWebCluster();
+    private static SeatunnelDatasourceControllerWrapper 
seatunnelDatasourceControllerWrapper;
+    private static JobControllerWrapper jobControllerWrapper;
+    private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
+    private static final String uniqueId = "_" + System.currentTimeMillis();
+
+    @BeforeAll
+    public static void setUp() {
+        seaTunnelWebCluster.start();
+        seatunnelDatasourceControllerWrapper = new 
SeatunnelDatasourceControllerWrapper();
+        jobControllerWrapper = new JobControllerWrapper();
+        jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+    }
+
+    @Test
+    public void createJobWithSingleAPI_shouldExecuteSuccessfully() {
+        String jobName = "jobWithSingleAPI" + uniqueId;
+        JobCreateReq jobCreateReq = 
jobControllerWrapper.populateJobCreateReqFromFile();
+        jobCreateReq.getJobConfig().setName(jobName);
+        jobCreateReq.getJobConfig().setDescription(jobName + " description");
+        setSourceIds(jobCreateReq, "fake_source_ds1" + uniqueId, "console_ds1" 
+ uniqueId);
+
+        Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(job.isSuccess());
+        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(job.getData());
+        assertTrue(result.isSuccess());
+        assertTrue(result.getData() > 0);
+        Result<List<JobPipelineDetailMetricsRes>> listResult =
+                JobUtils.waitForJobCompletion(result.getData());
+        assertEquals(1, listResult.getData().size());
+        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(5, listResult.getData().get(0).getReadRowCount());
+        assertEquals(5, listResult.getData().get(0).getWriteRowCount());
+    }
+
+    private void setSourceIds(
+            JobCreateReq jobCreateReq, String fsdSourceName, String 
csSourceName) {
+        // Set the data source id for the plugin configs
+        String fakeSourceDataSourceId =
+                
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName);
+        String consoleDataSourceId =
+                
seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName);
+        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+            if (pluginConfig.getName().equals("source-fake-source")) {
+                
pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId));
+            } else if (pluginConfig.getName().equals("sink-console")) {
+                
pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId));
+            }
+        }
+    }
+
+    @Test
+    public void createJobWithSingleAPI_ValidateInput() {
+        JobCreateReq jobCreateReq = 
jobControllerWrapper.populateJobCreateReqFromFile();
+        JobConfig jobConfig = jobCreateReq.getJobConfig();
+        jobConfig.setName("");
+        Result<Long> result = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(result.isFailed());
+        assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(), 
result.getCode());
+        assertEquals("param [name] can not be null or empty", result.getMsg());
+
+        String jobName = "jobValidation" + uniqueId;
+        jobConfig.setName(jobName);
+        jobConfig.setDescription(null);
+        result = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(result.isFailed());
+        assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(), 
result.getCode());
+        assertEquals("param [description] can not be null or empty", 
result.getMsg());
+
+        jobConfig.setDescription(jobName + " description");
+        jobConfig.getEnv().put("job.mode", "");
+        result = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(result.isFailed());
+        assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(), 
result.getCode());
+        assertEquals("param [job.mode] can not be null or empty", 
result.getMsg());
+
+        jobConfig.getEnv().put("job.mode", "InvalidJobMode");
+        result = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(result.isFailed());
+        assertEquals(SeatunnelErrorEnum.INVALID_PARAM.getCode(), 
result.getCode());
+        assertEquals(
+                "param [job.mode] is invalid. job.mode should be either BATCH 
or STREAM",
+                result.getMsg());
+
+        jobConfig.getEnv().put("job.mode", "BATCH");
+        setSourceIds(jobCreateReq, "fake_source_ds2" + uniqueId, "console_ds2" 
+ uniqueId);
+        result = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(result.isSuccess());
+        assertEquals(0, result.getCode());
+        assertNotNull(result.getData());
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        seaTunnelWebCluster.stop();
+    }
+}
diff --git 
a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json 
b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
new file mode 100644
index 00000000..acab2f66
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
@@ -0,0 +1,101 @@
+{
+  "jobConfig": {
+    "name": "job name",
+    "description": "job description",
+    "engine": "SeaTunnel",
+    "env": {
+      "job.mode": "BATCH",
+      "job.name": "SeaTunnel_Job",
+      "jars": "",
+      "checkpoint.interval": "",
+      "checkpoint.timeout": "",
+      "read_limit.rows_per_second": "",
+      "read_limit.bytes_per_second": "",
+      "custom_parameters": ""
+    }
+  },
+  "pluginConfigs": [
+    {
+      "name": "source-fake-source",
+      "type": "SOURCE",
+      "connectorType": null,
+      "tableOption": {
+        "databases": [
+          "fake_database"
+        ],
+        "tables": [
+          "fake_table"
+        ]
+      },
+      "selectTableFields": {
+        "tableFields": [
+          "name",
+          "age"
+        ],
+        "all": true
+      },
+      "dataSourceId": 1,
+      "sceneMode": "SINGLE_TABLE",
+      "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields 
{\\n        name = \\\"string\\\"\\n        age = \\\"int\\\"\\n      
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl
 [...]
+      "outputSchema": [
+        {
+          "fields": [
+            {
+              "type": "string",
+              "name": "name",
+              "comment": null,
+              "primaryKey": null,
+              "defaultValue": null,
+              "nullable": null,
+              "properties": null,
+              "unSupport": false,
+              "outputDataType": "STRING"
+            },
+            {
+              "type": "int",
+              "name": "age",
+              "comment": null,
+              "primaryKey": null,
+              "defaultValue": null,
+              "nullable": null,
+              "properties": null,
+              "unSupport": false,
+              "outputDataType": "INT"
+            }
+          ],
+          "tableName": "fake_table",
+          "database": "fake_database"
+        }
+      ],
+      "transformOptions": {}
+    },
+    {
+      "name": "sink-console",
+      "type": "SINK",
+      "connectorType": null,
+      "tableOption": {
+        "databases": [
+          "console_fake_database"
+        ],
+        "tables": [
+          "console_fake_table"
+        ]
+      },
+      "selectTableFields": {
+        "tableFields": [],
+        "all": false
+      },
+      "dataSourceId": 2,
+      "config": "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}",
+      "transformOptions": {}
+    }
+  ],
+  "jobDAG": {
+    "edges": [
+      {
+        "inputPluginId": "source-fake-source",
+        "targetPluginId": "sink-console"
+      }
+    ]
+  }
+}

Reply via email to