This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new c1a4e0c9 [Feature] [Seatunnel-web] Add support to execute job with 
parameter through API. (#193)
c1a4e0c9 is described below

commit c1a4e0c989627c866dad307800ea9029db9916f0
Author: Mohammad Arshad <[email protected]>
AuthorDate: Sun Aug 25 11:59:49 2024 +0530

    [Feature] [Seatunnel-web] Add support to execute job with parameter through 
API. (#193)
---
 .../app/controller/JobExecutorController.java      |   9 +-
 .../request/job/JobExecParam.java}                 |  25 ++-
 .../seatunnel/app/service/IJobExecutorService.java |   3 +-
 .../seatunnel/app/service/IJobInstanceService.java |  13 +-
 .../app/service/impl/JobExecutorServiceImpl.java   |   5 +-
 .../app/service/impl/JobInstanceServiceImpl.java   |  32 +++-
 .../app/service/impl/JobTaskServiceImpl.java       |   2 +-
 .../seatunnel/app/utils/JobExecParamUtil.java      |  88 ++++++++++
 .../controller/JobExecutorControllerWrapper.java   |  12 ++
 .../SeatunnelDatasourceControllerWrapper.java      |  17 ++
 .../app/test/JobExecutorControllerTest.java        | 187 +++++++++++++++++++++
 .../org/apache/seatunnel/app/utils/JobUtils.java   |  15 ++
 .../resources/jobs/mysql_source_mysql_sink.json    | 106 ++++++++++++
 13 files changed, 489 insertions(+), 25 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
index ee7773d1..77a7604f 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.app.controller;
 
 import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import org.apache.seatunnel.app.service.IJobExecutorService;
 import org.apache.seatunnel.app.service.IJobInstanceService;
@@ -27,6 +28,7 @@ import org.apache.seatunnel.server.common.SeatunnelException;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
@@ -52,8 +54,9 @@ public class JobExecutorController {
     public Result<Long> jobExecutor(
             @ApiParam(value = "userId", required = true) 
@RequestAttribute("userId") Integer userId,
             @ApiParam(value = "jobDefineId", required = true) 
@RequestParam("jobDefineId")
-                    Long jobDefineId) {
-        return jobExecutorService.jobExecute(userId, jobDefineId);
+                    Long jobDefineId,
+            @RequestBody(required = false) JobExecParam executeParam) {
+        return jobExecutorService.jobExecute(userId, jobDefineId, 
executeParam);
     }
 
     @GetMapping("/resource")
@@ -64,7 +67,7 @@ public class JobExecutorController {
             throws IOException {
         try {
             JobExecutorRes executeResource =
-                    jobInstanceService.createExecuteResource(userId, 
jobDefineId);
+                    jobInstanceService.createExecuteResource(userId, 
jobDefineId, null);
             return Result.success(executeResource);
         } catch (Exception e) {
             log.error("Get the resource for job executor error", e);
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
similarity index 61%
copy from 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
copy to 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
index 01ce17e5..223429cc 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
@@ -14,16 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.seatunnel.app.domain.request.job;
 
-package org.apache.seatunnel.app.service;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import org.apache.seatunnel.app.common.Result;
+import java.util.Map;
 
-public interface IJobExecutorService {
-
-    Result<Long> jobExecute(Integer userId, Long jobDefineId);
-
-    Result<Void> jobPause(Integer userId, Long jobInstanceId);
-
-    Result<Void> jobStore(Integer userId, Long jobInstanceId);
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+// Job execution parameters
+public class JobExecParam {
+    // job name -> key -> value
+    private Map<String, String> env;
+    // task name -> key -> value
+    private Map<String, Map<String, String>> tasks;
+    // task name -> new datasource id
+    private Map<String, String> datasource;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
index 01ce17e5..ad227422 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
@@ -18,10 +18,11 @@
 package org.apache.seatunnel.app.service;
 
 import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 
 public interface IJobExecutorService {
 
-    Result<Long> jobExecute(Integer userId, Long jobDefineId);
+    Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam 
executeParam);
 
     Result<Void> jobPause(Integer userId, Long jobInstanceId);
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
index 1b985777..e6db1d84 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.service;
 
 import org.apache.seatunnel.app.dal.entity.JobLine;
 import org.apache.seatunnel.app.dal.entity.JobTask;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 
 import lombok.NonNull;
@@ -26,9 +27,15 @@ import lombok.NonNull;
 import java.util.List;
 
 public interface IJobInstanceService {
-    JobExecutorRes createExecuteResource(@NonNull Integer userId, @NonNull 
Long jobDefineId);
-
-    String generateJobConfig(Long jobId, List<JobTask> tasks, List<JobLine> 
lines, String envStr);
+    JobExecutorRes createExecuteResource(
+            @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam 
executeParam);
+
+    String generateJobConfig(
+            Long jobId,
+            List<JobTask> tasks,
+            List<JobLine> lines,
+            String envStr,
+            JobExecParam executeParam);
 
     JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index 29a07c12..fbcf0190 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.app.service.impl;
 import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
 import org.apache.seatunnel.app.dal.entity.JobInstance;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.domain.response.engine.Engine;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import org.apache.seatunnel.app.service.IJobExecutorService;
@@ -64,10 +65,10 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
     @Resource private IJobInstanceDao jobInstanceDao;
 
     @Override
-    public Result<Long> jobExecute(Integer userId, Long jobDefineId) {
+    public Result<Long> jobExecute(Integer userId, Long jobDefineId, 
JobExecParam executeParam) {
 
         JobExecutorRes executeResource =
-                jobInstanceService.createExecuteResource(userId, jobDefineId);
+                jobInstanceService.createExecuteResource(userId, jobDefineId, 
executeParam);
         String jobConfig = executeResource.getJobConfig();
 
         String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index bf02de61..7a05da7e 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.seatunnel.app.domain.request.connector.BusinessMode;
 import org.apache.seatunnel.app.domain.request.connector.SceneMode;
 import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
 import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
 import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
 import org.apache.seatunnel.app.domain.request.job.transform.Transform;
@@ -59,6 +60,7 @@ import org.apache.seatunnel.app.service.IJobMetricsService;
 import org.apache.seatunnel.app.service.IVirtualTableService;
 import 
org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils;
 import 
org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils;
+import org.apache.seatunnel.app.utils.JobExecParamUtil;
 import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -125,7 +127,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
 
     @Override
     public JobExecutorRes createExecuteResource(
-            @NonNull Integer userId, @NonNull Long jobDefineId) {
+            @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam 
executeParam) {
         
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_RESOURCE, 
userId);
         log.info(
                 "receive createExecuteResource request, userId:{}, 
jobDefineId:{}",
@@ -134,7 +136,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
         JobDefinition job = jobDefinitionDao.getJob(jobDefineId);
         JobVersion latestVersion = jobVersionDao.getLatestVersion(job.getId());
         JobInstance jobInstance = new JobInstance();
-        String jobConfig = createJobConfig(latestVersion);
+        String jobConfig = createJobConfig(latestVersion, executeParam);
 
         try {
             jobInstance.setId(CodeGenerateUtils.getInstance().genCode());
@@ -163,11 +165,17 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
 
     @Override
     public String generateJobConfig(
-            Long jobId, List<JobTask> tasks, List<JobLine> lines, String 
envStr) {
+            Long jobId,
+            List<JobTask> tasks,
+            List<JobLine> lines,
+            String envStr,
+            JobExecParam executeParam) {
         checkSceneMode(tasks);
         BusinessMode businessMode =
                 
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
         Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
+        envConfig = JobExecParamUtil.updateEnvConfig(executeParam, envConfig);
+        JobExecParamUtil.updateDataSource(executeParam, tasks);
 
         Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
         Map<String, List<Config>> transformMap = new LinkedHashMap<>();
@@ -222,6 +230,9 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                                         
ParsingMode.SHARDING.name()));
                             }
 
+                            config =
+                                    JobExecParamUtil.updateTaskConfig(
+                                            executeParam, config, 
task.getName());
                             Config mergeConfig =
                                     mergeTaskConfig(
                                             task,
@@ -230,7 +241,9 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                             businessMode,
                                             config,
                                             optionRule);
-
+                            mergeConfig =
+                                    JobExecParamUtil.updateQueryTaskConfig(
+                                            executeParam, mergeConfig, 
task.getName());
                             sourceMap
                                     .get(task.getConnectorType())
                                     .add(filterEmptyValue(mergeConfig));
@@ -260,6 +273,9 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                         }
                         List<TableSchemaReq> inputSchemas = 
findInputSchemas(tasks, lines, task);
                         Config transformConfig = buildTransformConfig(task, 
config, inputSchemas);
+                        transformConfig =
+                                JobExecParamUtil.updateTaskConfig(
+                                        executeParam, transformConfig, 
task.getName());
                         transformMap
                                 .get(task.getConnectorType())
                                 .add(filterEmptyValue(transformConfig));
@@ -274,6 +290,9 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                             if (!sinkMap.containsKey(task.getConnectorType())) 
{
                                 sinkMap.put(task.getConnectorType(), new 
ArrayList<>());
                             }
+                            config =
+                                    JobExecParamUtil.updateTaskConfig(
+                                            executeParam, config, 
task.getName());
                             Config mergeConfig =
                                     mergeTaskConfig(
                                             task,
@@ -492,10 +511,11 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                 connectorConfig);
     }
 
-    private String createJobConfig(@NonNull JobVersion jobVersion) {
+    private String createJobConfig(@NonNull JobVersion jobVersion, 
JobExecParam executeParam) {
         List<JobTask> tasks = 
jobTaskDao.getTasksByVersionId(jobVersion.getId());
         List<JobLine> lines = 
jobLineDao.getLinesByVersionId(jobVersion.getId());
-        return generateJobConfig(jobVersion.getJobId(), tasks, lines, 
jobVersion.getEnv());
+        return generateJobConfig(
+                jobVersion.getJobId(), tasks, lines, jobVersion.getEnv(), 
executeParam);
     }
 
     private String getConnectorConfig(Map<String, List<Config>> connectorMap) {
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
index 0086cf77..abc4a7e4 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
@@ -267,7 +267,7 @@ public class JobTaskServiceImpl extends 
SeatunnelBaseServiceImpl implements IJob
             }
             // check the config can be generated
             jobInstanceService.generateJobConfig(
-                    version.getJobId(), tasks, lines, version.getEnv());
+                    version.getJobId(), tasks, lines, version.getEnv(), null);
             // TODO check schema output and input matched
         } catch (SeaTunnelException e) {
             log.error(ExceptionUtils.getMessage(e));
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
new file mode 100644
index 00000000..5e236849
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.utils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.app.dal.entity.JobTask;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+
+import java.util.List;
+import java.util.Map;
+
+public class JobExecParamUtil {
+
+    public static Config updateEnvConfig(JobExecParam jobExecParam, Config 
envConfig) {
+        if (jobExecParam == null || jobExecParam.getEnv() == null) {
+            return envConfig;
+        }
+        return updateConfig(envConfig, jobExecParam.getEnv());
+    }
+
+    private static Config updateConfig(Config config, Map<String, String> 
properties) {
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            config =
+                    config.withValue(
+                            entry.getKey(), 
ConfigValueFactory.fromAnyRef(entry.getValue()));
+        }
+        return config;
+    }
+
+    public static Config updateTaskConfig(
+            JobExecParam jobExecParam, Config taskConfig, String taskName) {
+        if (jobExecParam == null
+                || jobExecParam.getTasks() == null
+                || jobExecParam.getTasks().get(taskName) == null) {
+            return taskConfig;
+        }
+        return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName));
+    }
+
+    public static Config updateQueryTaskConfig(
+            JobExecParam jobExecParam, Config taskConfig, String taskName) {
+        if (jobExecParam == null
+                || jobExecParam.getTasks() == null
+                || jobExecParam.getTasks().get(taskName) == null) {
+            return taskConfig;
+        }
+        String query = jobExecParam.getTasks().get(taskName).get("query");
+        if (query != null) {
+            return taskConfig.withValue("query", 
ConfigValueFactory.fromAnyRef(query));
+        }
+        return taskConfig;
+    }
+
+    public static void updateDataSource(JobExecParam jobExecParam, 
List<JobTask> tasks) {
+        if (jobExecParam == null || jobExecParam.getDatasource() == null) {
+            return;
+        }
+        // Check current user has permission to access the datasource
+        jobExecParam
+                .getDatasource()
+                .forEach(
+                        (taskName, datasourceId) -> {
+                            tasks.stream()
+                                    .filter(task -> 
task.getName().equals(taskName))
+                                    .findFirst()
+                                    .ifPresent(
+                                            task ->
+                                                    task.setDataSourceId(
+                                                            
Long.parseLong(datasourceId)));
+                        });
+    }
+}
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
index af23fce5..881cf4ca 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
@@ -18,7 +18,9 @@ package org.apache.seatunnel.app.controller;
 
 import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
@@ -33,6 +35,16 @@ public class JobExecutorControllerWrapper extends 
SeatunnelWebTestingBase {
         return JSONTestUtils.parseObject(response, new 
TypeReference<Result<Long>>() {});
     }
 
+    public Result<Long> jobExecutor(Long jobDefineId, JobExecParam 
jobExecParam) {
+        String requestBody = JSONUtils.toPrettyJsonString(jobExecParam);
+        String response =
+                sendRequest(
+                        urlWithParam("job/executor/execute?jobDefineId=" + 
jobDefineId),
+                        requestBody,
+                        "POST");
+        return JSONTestUtils.parseObject(response, new 
TypeReference<Result<Long>>() {});
+    }
+
     public Result<Void> resource(Long jobDefineId) {
         String response =
                 sendRequest(urlWithParam("job/executor/resource?jobDefineId=" 
+ jobDefineId));
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
index effca68d..4fe17e54 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
@@ -46,6 +46,13 @@ public class SeatunnelDatasourceControllerWrapper extends 
SeatunnelWebTestingBas
         return result.getData();
     }
 
+    public String createMysqlDatasource(String datasourceName) {
+        DatasourceReq req = getMysqlDatasource(datasourceName);
+        Result<String> result = createDatasource(req);
+        assertTrue(result.isSuccess());
+        return result.getData();
+    }
+
     public DatasourceReq getFakeSourceDatasourceReq(String datasourceName) {
         DatasourceReq req = new DatasourceReq();
         req.setDatasourceName(datasourceName);
@@ -104,4 +111,14 @@ public class SeatunnelDatasourceControllerWrapper extends 
SeatunnelWebTestingBas
         return JSONTestUtils.parseObject(
                 response, new TypeReference<Result<PageInfo<DatasourceRes>>>() 
{});
     }
+
+    public DatasourceReq getMysqlDatasource(String datasourceName) {
+        DatasourceReq req = new DatasourceReq();
+        req.setDatasourceName(datasourceName);
+        req.setPluginName("JDBC-Mysql");
+        req.setDescription(datasourceName + " description");
+        req.setDatasourceConfig(
+                
"{\"url\":\"jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true\",\"driver\":\"com.mysql.cj.jdbc.Driver\",\"user\":\"someUser\",\"password\":\"somePassword\"}");
+        return req;
+    }
 }
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 4a01df28..7c942e1d 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -18,7 +18,13 @@ package org.apache.seatunnel.app.test;
 
 import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobControllerWrapper;
 import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import 
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 import org.apache.seatunnel.app.utils.JobUtils;
 
@@ -26,20 +32,30 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class JobExecutorControllerTest {
     private static final SeaTunnelWebCluster seaTunnelWebCluster = new 
SeaTunnelWebCluster();
     private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
     private static final String uniqueId = "_" + System.currentTimeMillis();
+    private static SeatunnelDatasourceControllerWrapper 
seatunnelDatasourceControllerWrapper;
+    private static JobControllerWrapper jobControllerWrapper;
 
     @BeforeAll
     public static void setUp() {
         seaTunnelWebCluster.start();
         jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+        seatunnelDatasourceControllerWrapper = new 
SeatunnelDatasourceControllerWrapper();
+        jobControllerWrapper = new JobControllerWrapper();
     }
 
     @Test
@@ -57,6 +73,166 @@ public class JobExecutorControllerTest {
         assertEquals(5, listResult.getData().get(0).getWriteRowCount());
     }
 
+    @Test
+    public void executeJobWithParameters() {
+        String jobName = "execJobWithParam" + uniqueId;
+        long jobVersionId = JobUtils.createJob(jobName);
+        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+        assertTrue(result.isSuccess());
+        assertTrue(result.getData() > 0);
+        Result<List<JobPipelineDetailMetricsRes>> listResult =
+                JobUtils.waitForJobCompletion(result.getData());
+        assertEquals(1, listResult.getData().size());
+        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(5, listResult.getData().get(0).getReadRowCount());
+        assertEquals(5, listResult.getData().get(0).getWriteRowCount());
+        String generatedJobFile = 
getGenerateJobFile(String.valueOf(jobVersionId));
+        assertTrue(generatedJobFile.contains("\"is_regex\"=\"false\""));
+        
assertTrue(generatedJobFile.contains("\"log.print.delay.ms\"=\"100\""));
+
+        JobExecParam jobExecParam = new JobExecParam();
+
+        Map<String, String> envConf = new HashMap<>();
+        envConf.put("job.name", "executeJobWithParameters");
+        jobExecParam.setEnv(envConf);
+        Map<String, Map<String, String>> tasks = new HashMap<>();
+        jobExecParam.setTasks(tasks);
+
+        int numberOfRecords = 100;
+        Map<String, String> task1Config = new HashMap<>();
+        task1Config.put("row.num", String.valueOf(numberOfRecords));
+        tasks.put("source-fakesource", task1Config);
+
+        Map<String, String> task2Config = new HashMap<>();
+        task2Config.put("replace_first", "true");
+        task2Config.put("is_regex", "true");
+        tasks.put("transform-replace", task2Config);
+
+        Map<String, String> task3Config = new HashMap<>();
+        task3Config.put("log.print.delay.ms", "99");
+        tasks.put("sink-console", task3Config);
+
+        result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, 
jobExecParam);
+        assertTrue(result.isSuccess());
+        assertTrue(result.getData() > 0);
+        listResult = JobUtils.waitForJobCompletion(result.getData());
+        assertEquals(1, listResult.getData().size());
+        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(numberOfRecords, 
listResult.getData().get(0).getReadRowCount());
+        assertEquals(numberOfRecords, 
listResult.getData().get(0).getWriteRowCount());
+
+        // Do few validations on the generated job file
+        generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
+        assertTrue(generatedJobFile.contains("\"is_regex\"=\"true\""));
+        assertTrue(generatedJobFile.contains("\"replace_first\"=\"true\""));
+        // database properties except query can not be updated
+        
assertFalse(generatedJobFile.contains("\"log.print.delay.ms\"=\"99\""));
+        
assertTrue(generatedJobFile.contains("\"job.name\"=executeJobWithParameters"));
+    }
+
+    @Test
+    public void executeJobWithParameters_AllowQueryUpdate() {
+        String jobName = "execJobUpdateQuery" + uniqueId;
+        JobCreateReq jobCreateReq = 
JobUtils.populateMySQLJobCreateReqFromFile();
+        jobCreateReq.getJobConfig().setName(jobName);
+        jobCreateReq.getJobConfig().setDescription(jobName + " description");
+        String datasourceName = "execJobUpdateQuery_db" + uniqueId;
+        String mysqlDatasourceId =
+                
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
+        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+            pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
+        }
+        Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(job.isSuccess());
+        Long jobVersionId = job.getData();
+        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+        // Fails because of the wrong credentials of the database.
+        assertFalse(result.isSuccess());
+        String generatedJobFile = 
getGenerateJobFile(String.valueOf(jobVersionId));
+        assertTrue(
+                generatedJobFile.contains(
+                        "query=\"SELECT `name`, `age` FROM 
`test`.`test_table`\""));
+        assertTrue(generatedJobFile.contains("user=someUser"));
+        assertTrue(generatedJobFile.contains("password=somePassword"));
+
+        JobExecParam jobExecParam = new JobExecParam();
+        Map<String, Map<String, String>> tasks = new HashMap<>();
+        jobExecParam.setTasks(tasks);
+
+        Map<String, String> task1Config = new HashMap<>();
+        task1Config.put("query", "SELECT `name`, `age` FROM 
`test`.`test_table` LIMIT 10");
+        task1Config.put("user", "otherUser");
+        task1Config.put("password", "otherPassword");
+        tasks.put("mysql_source_1", task1Config);
+
+        result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, 
jobExecParam);
+        assertFalse(result.isSuccess());
+        // query should be changed but other database details should not be 
changed,
+        generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
+        assertTrue(
+                generatedJobFile.contains(
+                        "query=\"SELECT `name`, `age` FROM `test`.`test_table` 
LIMIT 10\""));
+        assertTrue(generatedJobFile.contains("user=someUser"));
+        assertTrue(generatedJobFile.contains("password=somePassword"));
+    }
+
+    @Test
+    public void executeJobWithParameters_ChangeDatabase() {
+        String jobName = "execJobChangeDatabase" + uniqueId;
+        JobCreateReq jobCreateReq = 
JobUtils.populateMySQLJobCreateReqFromFile();
+        jobCreateReq.getJobConfig().setName(jobName);
+        jobCreateReq.getJobConfig().setDescription(jobName + " description");
+        String datasourceName = "execJobChangeDatabase_db_1" + uniqueId;
+        String mysqlDatasourceId =
+                
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
+        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+            pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
+        }
+        Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(job.isSuccess());
+        Long jobVersionId = job.getData();
+        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+        // Fails because of the wrong credentials of the database.
+        assertFalse(result.isSuccess());
+        String generatedJobFile = 
getGenerateJobFile(String.valueOf(jobVersionId));
+        assertTrue(
+                generatedJobFile.contains(
+                        "query=\"SELECT `name`, `age` FROM 
`test`.`test_table`\""));
+        assertTrue(generatedJobFile.contains("user=someUser"));
+        assertTrue(generatedJobFile.contains("password=somePassword"));
+
+        String datasourceName2 = "execJobChangeDatabase_db_2" + uniqueId;
+        DatasourceReq req = getDatasourceReq(datasourceName2);
+
+        Result<String> datasource = 
seatunnelDatasourceControllerWrapper.createDatasource(req);
+        assertTrue(datasource.isSuccess());
+        JobExecParam jobExecParam = new JobExecParam();
+        Map<String, String> dbConfigs = new HashMap<>();
+        jobExecParam.setDatasource(dbConfigs);
+
+        dbConfigs.put("mysql_source_1", datasource.getData());
+
+        result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, 
jobExecParam);
+        assertFalse(result.isSuccess());
+        // query should be changed but other database details should not be 
changed,
+        generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
+        assertTrue(
+                generatedJobFile.contains(
+                        "query=\"SELECT `name`, `age` FROM 
`test`.`test_table`\""));
+        assertTrue(generatedJobFile.contains("user=someUser2"));
+        assertTrue(generatedJobFile.contains("password=somePassword2"));
+    }
+
+    private static DatasourceReq getDatasourceReq(String datasourceName2) {
+        DatasourceReq req = new DatasourceReq();
+        req.setDatasourceName(datasourceName2);
+        req.setPluginName("JDBC-Mysql");
+        req.setDescription(datasourceName2 + " description");
+        req.setDatasourceConfig(
+                
"{\"url\":\"jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true\",\"driver\":\"com.mysql.cj.jdbc.Driver\",\"user\":\"someUser2\",\"password\":\"somePassword2\"}");
+        return req;
+    }
+
     @Test
     public void restoreJob_shouldReturnSuccess_whenValidRequest() {
         String jobName = "jobRestore" + uniqueId;
@@ -71,4 +247,15 @@ public class JobExecutorControllerTest {
     public static void tearDown() {
         seaTunnelWebCluster.stop();
     }
+
+    private String getGenerateJobFile(String jobId) {
+        String filePath = "profile/" + jobId + ".conf";
+        String jsonContent;
+        try {
+            jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return jsonContent;
+    }
 }
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
index 0a530d3b..a9214baf 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -24,10 +24,14 @@ import 
org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
 import 
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
 import org.apache.seatunnel.app.domain.request.job.Edge;
 import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
 import org.apache.seatunnel.app.domain.request.job.JobDAG;
 import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -138,4 +142,15 @@ public class JobUtils {
         assertTrue(jobTaskCheckResResult.isSuccess());
         return jobVersionId;
     }
+
+    public static JobCreateReq populateMySQLJobCreateReqFromFile() {
+        String filePath = 
"src/test/resources/jobs/mysql_source_mysql_sink.json";
+        String jsonContent;
+        try {
+            jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
+    }
 }
diff --git 
a/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json 
b/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json
new file mode 100644
index 00000000..967244f5
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json
@@ -0,0 +1,106 @@
+{
+  "jobConfig": {
+    "name": "mysql_source_mysql_sink",
+    "description": "mysql_source_mysql_sink description",
+    "engine": "SeaTunnel",
+    "env": {
+      "job.mode": "BATCH",
+      "job.name": "SeaTunnel_Job",
+      "jars": "",
+      "checkpoint.interval": "",
+      "checkpoint.timeout": "",
+      "read_limit.rows_per_second": "",
+      "read_limit.bytes_per_second": "",
+      "custom_parameters": ""
+    }
+  },
+  "pluginConfigs": [
+    {
+      "pluginId": "1724412762429155",
+      "name": "mysql_source_1",
+      "type": "SOURCE",
+      "connectorType": null,
+      "tableOption": {
+        "databases": [
+          "test"
+        ],
+        "tables": [
+          "test_table"
+        ]
+      },
+      "selectTableFields": {
+        "tableFields": [
+          "name",
+          "age"
+        ],
+        "all": true
+      },
+      "dataSourceId": 14717667385504,
+      "sceneMode": "SINGLE_TABLE",
+      "config": 
"{\"query\":\"\",\"connection_check_timeout_sec\":30,\"fetch_size\":\"\",\"partition_column\":\"\",\"partition_upper_bound\":\"\",\"partition_lower_bound\":\"\",\"partition_num\":\"\",\"compatible_mode\":\"\",\"properties\":\"\",\"table_path\":\"\",\"where_condition\":\"\",\"table_list\":\"\",\"split.size\":8096,\"split.even-distribution.factor.upper-bound\":100,\"split.even-distribution.factor.lower-bound\":0.05,\"split.sample-sharding.threshold\":1000,\"split.inverse-sa
 [...]
+      "outputSchema": [
+        {
+          "fields": [
+            {
+              "type": "LONGTEXT",
+              "name": "name",
+              "comment": "",
+              "primaryKey": false,
+              "defaultValue": null,
+              "nullable": false,
+              "properties": null,
+              "unSupport": false,
+              "outputDataType": "STRING"
+            },
+            {
+              "type": "INT",
+              "name": "age",
+              "comment": "",
+              "primaryKey": false,
+              "defaultValue": null,
+              "nullable": false,
+              "properties": null,
+              "unSupport": false,
+              "outputDataType": "INT"
+            }
+          ],
+          "tableName": "test_table",
+          "database": "test"
+        }
+      ],
+      "transformOptions": {}
+    },
+    {
+      "pluginId": "17244128298414uc",
+      "name": "mysql_sink_1",
+      "type": "SINK",
+      "connectorType": null,
+      "tableOption": {
+        "databases": [
+          "test"
+        ],
+        "tables": [
+          "test_table"
+        ]
+      },
+      "selectTableFields": {
+        "tableFields": [
+          "name",
+          "age"
+        ],
+        "all": true
+      },
+      "dataSourceId": 14717667385504,
+      "config": 
"{\"query\":\"\",\"schema_save_mode\":\"CREATE_SCHEMA_WHEN_NOT_EXIST\",\"data_save_mode\":\"APPEND_DATA\",\"custom_sql\":\"\",\"connection_check_timeout_sec\":30,\"batch_size\":1000,\"is_exactly_once\":\"false\",\"xa_data_source_class_name\":\"\",\"max_commit_attempts\":3,\"transaction_timeout_sec\":-1,\"max_retries\":\"1\",\"auto_commit\":\"true\",\"support_upsert_by_query_primary_key_exist\":\"false\",\"primary_keys\":\"\",\"compatible_mode\":\"\",\"multi_table_sink_rep
 [...]
+      "transformOptions": {}
+    }
+  ],
+  "jobDAG": {
+    "edges": [
+      {
+        "inputPluginId": "mysql_source_1",
+        "targetPluginId": "mysql_sink_1"
+      }
+    ]
+  }
+}
\ No newline at end of file


Reply via email to