This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new c1a4e0c9 [Feature] [Seatunnel-web] Add support to execute job with
parameter through API. (#193)
c1a4e0c9 is described below
commit c1a4e0c989627c866dad307800ea9029db9916f0
Author: Mohammad Arshad <[email protected]>
AuthorDate: Sun Aug 25 11:59:49 2024 +0530
[Feature] [Seatunnel-web] Add support to execute job with parameter through
API. (#193)
---
.../app/controller/JobExecutorController.java | 9 +-
.../request/job/JobExecParam.java} | 25 ++-
.../seatunnel/app/service/IJobExecutorService.java | 3 +-
.../seatunnel/app/service/IJobInstanceService.java | 13 +-
.../app/service/impl/JobExecutorServiceImpl.java | 5 +-
.../app/service/impl/JobInstanceServiceImpl.java | 32 +++-
.../app/service/impl/JobTaskServiceImpl.java | 2 +-
.../seatunnel/app/utils/JobExecParamUtil.java | 88 ++++++++++
.../controller/JobExecutorControllerWrapper.java | 12 ++
.../SeatunnelDatasourceControllerWrapper.java | 17 ++
.../app/test/JobExecutorControllerTest.java | 187 +++++++++++++++++++++
.../org/apache/seatunnel/app/utils/JobUtils.java | 15 ++
.../resources/jobs/mysql_source_mysql_sink.json | 106 ++++++++++++
13 files changed, 489 insertions(+), 25 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
index ee7773d1..77a7604f 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.app.controller;
import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.service.IJobExecutorService;
import org.apache.seatunnel.app.service.IJobInstanceService;
@@ -27,6 +28,7 @@ import org.apache.seatunnel.server.common.SeatunnelException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@@ -52,8 +54,9 @@ public class JobExecutorController {
public Result<Long> jobExecutor(
@ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobDefineId", required = true)
@RequestParam("jobDefineId")
- Long jobDefineId) {
- return jobExecutorService.jobExecute(userId, jobDefineId);
+ Long jobDefineId,
+ @RequestBody(required = false) JobExecParam executeParam) {
+ return jobExecutorService.jobExecute(userId, jobDefineId,
executeParam);
}
@GetMapping("/resource")
@@ -64,7 +67,7 @@ public class JobExecutorController {
throws IOException {
try {
JobExecutorRes executeResource =
- jobInstanceService.createExecuteResource(userId,
jobDefineId);
+ jobInstanceService.createExecuteResource(userId,
jobDefineId, null);
return Result.success(executeResource);
} catch (Exception e) {
log.error("Get the resource for job executor error", e);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
similarity index 61%
copy from
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
copy to
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
index 01ce17e5..223429cc 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
@@ -14,16 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.app.domain.request.job;
-package org.apache.seatunnel.app.service;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-import org.apache.seatunnel.app.common.Result;
+import java.util.Map;
-public interface IJobExecutorService {
-
- Result<Long> jobExecute(Integer userId, Long jobDefineId);
-
- Result<Void> jobPause(Integer userId, Long jobInstanceId);
-
- Result<Void> jobStore(Integer userId, Long jobInstanceId);
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+// Job execution parameters
+public class JobExecParam {
+ // job name -> key -> value
+ private Map<String, String> env;
+ // task name -> key -> value
+ private Map<String, Map<String, String>> tasks;
+ // task name -> new datasource id
+ private Map<String, String> datasource;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
index 01ce17e5..ad227422 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
@@ -18,10 +18,11 @@
package org.apache.seatunnel.app.service;
import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
public interface IJobExecutorService {
- Result<Long> jobExecute(Integer userId, Long jobDefineId);
+ Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam
executeParam);
Result<Void> jobPause(Integer userId, Long jobInstanceId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
index 1b985777..e6db1d84 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.service;
import org.apache.seatunnel.app.dal.entity.JobLine;
import org.apache.seatunnel.app.dal.entity.JobTask;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import lombok.NonNull;
@@ -26,9 +27,15 @@ import lombok.NonNull;
import java.util.List;
public interface IJobInstanceService {
- JobExecutorRes createExecuteResource(@NonNull Integer userId, @NonNull
Long jobDefineId);
-
- String generateJobConfig(Long jobId, List<JobTask> tasks, List<JobLine>
lines, String envStr);
+ JobExecutorRes createExecuteResource(
+ @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam
executeParam);
+
+ String generateJobConfig(
+ Long jobId,
+ List<JobTask> tasks,
+ List<JobLine> lines,
+ String envStr,
+ JobExecParam executeParam);
JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index 29a07c12..fbcf0190 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.app.service.impl;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.entity.JobInstance;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.service.IJobExecutorService;
@@ -64,10 +65,10 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
@Resource private IJobInstanceDao jobInstanceDao;
@Override
- public Result<Long> jobExecute(Integer userId, Long jobDefineId) {
+ public Result<Long> jobExecute(Integer userId, Long jobDefineId,
JobExecParam executeParam) {
JobExecutorRes executeResource =
- jobInstanceService.createExecuteResource(userId, jobDefineId);
+ jobInstanceService.createExecuteResource(userId, jobDefineId,
executeParam);
String jobConfig = executeResource.getJobConfig();
String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index bf02de61..7a05da7e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -45,6 +45,7 @@ import
org.apache.seatunnel.app.domain.request.connector.BusinessMode;
import org.apache.seatunnel.app.domain.request.connector.SceneMode;
import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
import org.apache.seatunnel.app.domain.request.job.transform.Transform;
@@ -59,6 +60,7 @@ import org.apache.seatunnel.app.service.IJobMetricsService;
import org.apache.seatunnel.app.service.IVirtualTableService;
import
org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils;
import
org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils;
+import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -125,7 +127,7 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public JobExecutorRes createExecuteResource(
- @NonNull Integer userId, @NonNull Long jobDefineId) {
+ @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam
executeParam) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_RESOURCE,
userId);
log.info(
"receive createExecuteResource request, userId:{},
jobDefineId:{}",
@@ -134,7 +136,7 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
JobDefinition job = jobDefinitionDao.getJob(jobDefineId);
JobVersion latestVersion = jobVersionDao.getLatestVersion(job.getId());
JobInstance jobInstance = new JobInstance();
- String jobConfig = createJobConfig(latestVersion);
+ String jobConfig = createJobConfig(latestVersion, executeParam);
try {
jobInstance.setId(CodeGenerateUtils.getInstance().genCode());
@@ -163,11 +165,17 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public String generateJobConfig(
- Long jobId, List<JobTask> tasks, List<JobLine> lines, String
envStr) {
+ Long jobId,
+ List<JobTask> tasks,
+ List<JobLine> lines,
+ String envStr,
+ JobExecParam executeParam) {
checkSceneMode(tasks);
BusinessMode businessMode =
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
+ envConfig = JobExecParamUtil.updateEnvConfig(executeParam, envConfig);
+ JobExecParamUtil.updateDataSource(executeParam, tasks);
Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
Map<String, List<Config>> transformMap = new LinkedHashMap<>();
@@ -222,6 +230,9 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
ParsingMode.SHARDING.name()));
}
+ config =
+ JobExecParamUtil.updateTaskConfig(
+ executeParam, config,
task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
@@ -230,7 +241,9 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
businessMode,
config,
optionRule);
-
+ mergeConfig =
+ JobExecParamUtil.updateQueryTaskConfig(
+ executeParam, mergeConfig,
task.getName());
sourceMap
.get(task.getConnectorType())
.add(filterEmptyValue(mergeConfig));
@@ -260,6 +273,9 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
}
List<TableSchemaReq> inputSchemas =
findInputSchemas(tasks, lines, task);
Config transformConfig = buildTransformConfig(task,
config, inputSchemas);
+ transformConfig =
+ JobExecParamUtil.updateTaskConfig(
+ executeParam, transformConfig,
task.getName());
transformMap
.get(task.getConnectorType())
.add(filterEmptyValue(transformConfig));
@@ -274,6 +290,9 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
if (!sinkMap.containsKey(task.getConnectorType()))
{
sinkMap.put(task.getConnectorType(), new
ArrayList<>());
}
+ config =
+ JobExecParamUtil.updateTaskConfig(
+ executeParam, config,
task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
@@ -492,10 +511,11 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
connectorConfig);
}
- private String createJobConfig(@NonNull JobVersion jobVersion) {
+ private String createJobConfig(@NonNull JobVersion jobVersion,
JobExecParam executeParam) {
List<JobTask> tasks =
jobTaskDao.getTasksByVersionId(jobVersion.getId());
List<JobLine> lines =
jobLineDao.getLinesByVersionId(jobVersion.getId());
- return generateJobConfig(jobVersion.getJobId(), tasks, lines,
jobVersion.getEnv());
+ return generateJobConfig(
+ jobVersion.getJobId(), tasks, lines, jobVersion.getEnv(),
executeParam);
}
private String getConnectorConfig(Map<String, List<Config>> connectorMap) {
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
index 0086cf77..abc4a7e4 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
@@ -267,7 +267,7 @@ public class JobTaskServiceImpl extends
SeatunnelBaseServiceImpl implements IJob
}
// check the config can be generated
jobInstanceService.generateJobConfig(
- version.getJobId(), tasks, lines, version.getEnv());
+ version.getJobId(), tasks, lines, version.getEnv(), null);
// TODO check schema output and input matched
} catch (SeaTunnelException e) {
log.error(ExceptionUtils.getMessage(e));
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
new file mode 100644
index 00000000..5e236849
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.utils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.app.dal.entity.JobTask;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+
+import java.util.List;
+import java.util.Map;
+
+public class JobExecParamUtil {
+
+ public static Config updateEnvConfig(JobExecParam jobExecParam, Config
envConfig) {
+ if (jobExecParam == null || jobExecParam.getEnv() == null) {
+ return envConfig;
+ }
+ return updateConfig(envConfig, jobExecParam.getEnv());
+ }
+
+ private static Config updateConfig(Config config, Map<String, String>
properties) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ config =
+ config.withValue(
+ entry.getKey(),
ConfigValueFactory.fromAnyRef(entry.getValue()));
+ }
+ return config;
+ }
+
+ public static Config updateTaskConfig(
+ JobExecParam jobExecParam, Config taskConfig, String taskName) {
+ if (jobExecParam == null
+ || jobExecParam.getTasks() == null
+ || jobExecParam.getTasks().get(taskName) == null) {
+ return taskConfig;
+ }
+ return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName));
+ }
+
+ public static Config updateQueryTaskConfig(
+ JobExecParam jobExecParam, Config taskConfig, String taskName) {
+ if (jobExecParam == null
+ || jobExecParam.getTasks() == null
+ || jobExecParam.getTasks().get(taskName) == null) {
+ return taskConfig;
+ }
+ String query = jobExecParam.getTasks().get(taskName).get("query");
+ if (query != null) {
+ return taskConfig.withValue("query",
ConfigValueFactory.fromAnyRef(query));
+ }
+ return taskConfig;
+ }
+
+ public static void updateDataSource(JobExecParam jobExecParam,
List<JobTask> tasks) {
+ if (jobExecParam == null || jobExecParam.getDatasource() == null) {
+ return;
+ }
+ // Check current user has permission to access the datasource
+ jobExecParam
+ .getDatasource()
+ .forEach(
+ (taskName, datasourceId) -> {
+ tasks.stream()
+ .filter(task ->
task.getName().equals(taskName))
+ .findFirst()
+ .ifPresent(
+ task ->
+ task.setDataSourceId(
+
Long.parseLong(datasourceId)));
+ });
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
index af23fce5..881cf4ca 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
@@ -18,7 +18,9 @@ package org.apache.seatunnel.app.controller;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -33,6 +35,16 @@ public class JobExecutorControllerWrapper extends
SeatunnelWebTestingBase {
return JSONTestUtils.parseObject(response, new
TypeReference<Result<Long>>() {});
}
+ public Result<Long> jobExecutor(Long jobDefineId, JobExecParam
jobExecParam) {
+ String requestBody = JSONUtils.toPrettyJsonString(jobExecParam);
+ String response =
+ sendRequest(
+ urlWithParam("job/executor/execute?jobDefineId=" +
jobDefineId),
+ requestBody,
+ "POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Long>>() {});
+ }
+
public Result<Void> resource(Long jobDefineId) {
String response =
sendRequest(urlWithParam("job/executor/resource?jobDefineId="
+ jobDefineId));
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
index effca68d..4fe17e54 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
@@ -46,6 +46,13 @@ public class SeatunnelDatasourceControllerWrapper extends
SeatunnelWebTestingBas
return result.getData();
}
+ public String createMysqlDatasource(String datasourceName) {
+ DatasourceReq req = getMysqlDatasource(datasourceName);
+ Result<String> result = createDatasource(req);
+ assertTrue(result.isSuccess());
+ return result.getData();
+ }
+
public DatasourceReq getFakeSourceDatasourceReq(String datasourceName) {
DatasourceReq req = new DatasourceReq();
req.setDatasourceName(datasourceName);
@@ -104,4 +111,14 @@ public class SeatunnelDatasourceControllerWrapper extends
SeatunnelWebTestingBas
return JSONTestUtils.parseObject(
response, new TypeReference<Result<PageInfo<DatasourceRes>>>()
{});
}
+
+ public DatasourceReq getMysqlDatasource(String datasourceName) {
+ DatasourceReq req = new DatasourceReq();
+ req.setDatasourceName(datasourceName);
+ req.setPluginName("JDBC-Mysql");
+ req.setDescription(datasourceName + " description");
+ req.setDatasourceConfig(
+
"{\"url\":\"jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true\",\"driver\":\"com.mysql.cj.jdbc.Driver\",\"user\":\"someUser\",\"password\":\"somePassword\"}");
+ return req;
+ }
}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 4a01df28..7c942e1d 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -18,7 +18,13 @@ package org.apache.seatunnel.app.test;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.utils.JobUtils;
@@ -26,20 +32,30 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class JobExecutorControllerTest {
private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
private static final String uniqueId = "_" + System.currentTimeMillis();
+ private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
+ private static JobControllerWrapper jobControllerWrapper;
@BeforeAll
public static void setUp() {
seaTunnelWebCluster.start();
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+ seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
+ jobControllerWrapper = new JobControllerWrapper();
}
@Test
@@ -57,6 +73,166 @@ public class JobExecutorControllerTest {
assertEquals(5, listResult.getData().get(0).getWriteRowCount());
}
+ @Test
+ public void executeJobWithParameters() {
+ String jobName = "execJobWithParam" + uniqueId;
+ long jobVersionId = JobUtils.createJob(jobName);
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ assertTrue(result.isSuccess());
+ assertTrue(result.getData() > 0);
+ Result<List<JobPipelineDetailMetricsRes>> listResult =
+ JobUtils.waitForJobCompletion(result.getData());
+ assertEquals(1, listResult.getData().size());
+ assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+ assertEquals(5, listResult.getData().get(0).getReadRowCount());
+ assertEquals(5, listResult.getData().get(0).getWriteRowCount());
+ String generatedJobFile =
getGenerateJobFile(String.valueOf(jobVersionId));
+ assertTrue(generatedJobFile.contains("\"is_regex\"=\"false\""));
+
assertTrue(generatedJobFile.contains("\"log.print.delay.ms\"=\"100\""));
+
+ JobExecParam jobExecParam = new JobExecParam();
+
+ Map<String, String> envConf = new HashMap<>();
+ envConf.put("job.name", "executeJobWithParameters");
+ jobExecParam.setEnv(envConf);
+ Map<String, Map<String, String>> tasks = new HashMap<>();
+ jobExecParam.setTasks(tasks);
+
+ int numberOfRecords = 100;
+ Map<String, String> task1Config = new HashMap<>();
+ task1Config.put("row.num", String.valueOf(numberOfRecords));
+ tasks.put("source-fakesource", task1Config);
+
+ Map<String, String> task2Config = new HashMap<>();
+ task2Config.put("replace_first", "true");
+ task2Config.put("is_regex", "true");
+ tasks.put("transform-replace", task2Config);
+
+ Map<String, String> task3Config = new HashMap<>();
+ task3Config.put("log.print.delay.ms", "99");
+ tasks.put("sink-console", task3Config);
+
+ result = jobExecutorControllerWrapper.jobExecutor(jobVersionId,
jobExecParam);
+ assertTrue(result.isSuccess());
+ assertTrue(result.getData() > 0);
+ listResult = JobUtils.waitForJobCompletion(result.getData());
+ assertEquals(1, listResult.getData().size());
+ assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+ assertEquals(numberOfRecords,
listResult.getData().get(0).getReadRowCount());
+ assertEquals(numberOfRecords,
listResult.getData().get(0).getWriteRowCount());
+
+ // Do few validations on the generated job file
+ generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
+ assertTrue(generatedJobFile.contains("\"is_regex\"=\"true\""));
+ assertTrue(generatedJobFile.contains("\"replace_first\"=\"true\""));
+ // database properties except query can not be updated
+
assertFalse(generatedJobFile.contains("\"log.print.delay.ms\"=\"99\""));
+
assertTrue(generatedJobFile.contains("\"job.name\"=executeJobWithParameters"));
+ }
+
+ @Test
+ public void executeJobWithParameters_AllowQueryUpdate() {
+ String jobName = "execJobUpdateQuery" + uniqueId;
+ JobCreateReq jobCreateReq =
JobUtils.populateMySQLJobCreateReqFromFile();
+ jobCreateReq.getJobConfig().setName(jobName);
+ jobCreateReq.getJobConfig().setDescription(jobName + " description");
+ String datasourceName = "execJobUpdateQuery_db" + uniqueId;
+ String mysqlDatasourceId =
+
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
+ for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+ pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
+ }
+ Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(job.isSuccess());
+ Long jobVersionId = job.getData();
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ // Fails because of the wrong credentials of the database.
+ assertFalse(result.isSuccess());
+ String generatedJobFile =
getGenerateJobFile(String.valueOf(jobVersionId));
+ assertTrue(
+ generatedJobFile.contains(
+ "query=\"SELECT `name`, `age` FROM
`test`.`test_table`\""));
+ assertTrue(generatedJobFile.contains("user=someUser"));
+ assertTrue(generatedJobFile.contains("password=somePassword"));
+
+ JobExecParam jobExecParam = new JobExecParam();
+ Map<String, Map<String, String>> tasks = new HashMap<>();
+ jobExecParam.setTasks(tasks);
+
+ Map<String, String> task1Config = new HashMap<>();
+ task1Config.put("query", "SELECT `name`, `age` FROM
`test`.`test_table` LIMIT 10");
+ task1Config.put("user", "otherUser");
+ task1Config.put("password", "otherPassword");
+ tasks.put("mysql_source_1", task1Config);
+
+ result = jobExecutorControllerWrapper.jobExecutor(jobVersionId,
jobExecParam);
+ assertFalse(result.isSuccess());
+ // query should be changed but other database details should not be
changed,
+ generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
+ assertTrue(
+ generatedJobFile.contains(
+ "query=\"SELECT `name`, `age` FROM `test`.`test_table`
LIMIT 10\""));
+ assertTrue(generatedJobFile.contains("user=someUser"));
+ assertTrue(generatedJobFile.contains("password=somePassword"));
+ }
+
+ @Test
+ public void executeJobWithParameters_ChangeDatabase() {
+ String jobName = "execJobChangeDatabase" + uniqueId;
+ JobCreateReq jobCreateReq =
JobUtils.populateMySQLJobCreateReqFromFile();
+ jobCreateReq.getJobConfig().setName(jobName);
+ jobCreateReq.getJobConfig().setDescription(jobName + " description");
+ String datasourceName = "execJobChangeDatabase_db_1" + uniqueId;
+ String mysqlDatasourceId =
+
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
+ for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+ pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
+ }
+ Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(job.isSuccess());
+ Long jobVersionId = job.getData();
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ // Fails because of the wrong credentials of the database.
+ assertFalse(result.isSuccess());
+ String generatedJobFile =
getGenerateJobFile(String.valueOf(jobVersionId));
+ assertTrue(
+ generatedJobFile.contains(
+ "query=\"SELECT `name`, `age` FROM
`test`.`test_table`\""));
+ assertTrue(generatedJobFile.contains("user=someUser"));
+ assertTrue(generatedJobFile.contains("password=somePassword"));
+
+ String datasourceName2 = "execJobChangeDatabase_db_2" + uniqueId;
+ DatasourceReq req = getDatasourceReq(datasourceName2);
+
+ Result<String> datasource =
seatunnelDatasourceControllerWrapper.createDatasource(req);
+ assertTrue(datasource.isSuccess());
+ JobExecParam jobExecParam = new JobExecParam();
+ Map<String, String> dbConfigs = new HashMap<>();
+ jobExecParam.setDatasource(dbConfigs);
+
+ dbConfigs.put("mysql_source_1", datasource.getData());
+
+ result = jobExecutorControllerWrapper.jobExecutor(jobVersionId,
jobExecParam);
+ assertFalse(result.isSuccess());
+ // query should be changed but other database details should not be
changed,
+ generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
+ assertTrue(
+ generatedJobFile.contains(
+ "query=\"SELECT `name`, `age` FROM
`test`.`test_table`\""));
+ assertTrue(generatedJobFile.contains("user=someUser2"));
+ assertTrue(generatedJobFile.contains("password=somePassword2"));
+ }
+
+ private static DatasourceReq getDatasourceReq(String datasourceName2) {
+ DatasourceReq req = new DatasourceReq();
+ req.setDatasourceName(datasourceName2);
+ req.setPluginName("JDBC-Mysql");
+ req.setDescription(datasourceName2 + " description");
+ req.setDatasourceConfig(
+
"{\"url\":\"jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true\",\"driver\":\"com.mysql.cj.jdbc.Driver\",\"user\":\"someUser2\",\"password\":\"somePassword2\"}");
+ return req;
+ }
+
@Test
public void restoreJob_shouldReturnSuccess_whenValidRequest() {
String jobName = "jobRestore" + uniqueId;
@@ -71,4 +247,15 @@ public class JobExecutorControllerTest {
public static void tearDown() {
seaTunnelWebCluster.stop();
}
+
+ private String getGenerateJobFile(String jobId) {
+ String filePath = "profile/" + jobId + ".conf";
+ String jsonContent;
+ try {
+ jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return jsonContent;
+ }
}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
index 0a530d3b..a9214baf 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -24,10 +24,14 @@ import
org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
import org.apache.seatunnel.app.domain.request.job.Edge;
import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobDAG;
import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -138,4 +142,15 @@ public class JobUtils {
assertTrue(jobTaskCheckResResult.isSuccess());
return jobVersionId;
}
+
+ public static JobCreateReq populateMySQLJobCreateReqFromFile() {
+ String filePath =
"src/test/resources/jobs/mysql_source_mysql_sink.json";
+ String jsonContent;
+ try {
+ jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
+ }
}
diff --git
a/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json
b/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json
new file mode 100644
index 00000000..967244f5
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json
@@ -0,0 +1,106 @@
+{
+ "jobConfig": {
+ "name": "mysql_source_mysql_sink",
+ "description": "mysql_source_mysql_sink description",
+ "engine": "SeaTunnel",
+ "env": {
+ "job.mode": "BATCH",
+ "job.name": "SeaTunnel_Job",
+ "jars": "",
+ "checkpoint.interval": "",
+ "checkpoint.timeout": "",
+ "read_limit.rows_per_second": "",
+ "read_limit.bytes_per_second": "",
+ "custom_parameters": ""
+ }
+ },
+ "pluginConfigs": [
+ {
+ "pluginId": "1724412762429155",
+ "name": "mysql_source_1",
+ "type": "SOURCE",
+ "connectorType": null,
+ "tableOption": {
+ "databases": [
+ "test"
+ ],
+ "tables": [
+ "test_table"
+ ]
+ },
+ "selectTableFields": {
+ "tableFields": [
+ "name",
+ "age"
+ ],
+ "all": true
+ },
+ "dataSourceId": 14717667385504,
+ "sceneMode": "SINGLE_TABLE",
+ "config":
"{\"query\":\"\",\"connection_check_timeout_sec\":30,\"fetch_size\":\"\",\"partition_column\":\"\",\"partition_upper_bound\":\"\",\"partition_lower_bound\":\"\",\"partition_num\":\"\",\"compatible_mode\":\"\",\"properties\":\"\",\"table_path\":\"\",\"where_condition\":\"\",\"table_list\":\"\",\"split.size\":8096,\"split.even-distribution.factor.upper-bound\":100,\"split.even-distribution.factor.lower-bound\":0.05,\"split.sample-sharding.threshold\":1000,\"split.inverse-sa
[...]
+ "outputSchema": [
+ {
+ "fields": [
+ {
+ "type": "LONGTEXT",
+ "name": "name",
+ "comment": "",
+ "primaryKey": false,
+ "defaultValue": null,
+ "nullable": false,
+ "properties": null,
+ "unSupport": false,
+ "outputDataType": "STRING"
+ },
+ {
+ "type": "INT",
+ "name": "age",
+ "comment": "",
+ "primaryKey": false,
+ "defaultValue": null,
+ "nullable": false,
+ "properties": null,
+ "unSupport": false,
+ "outputDataType": "INT"
+ }
+ ],
+ "tableName": "test_table",
+ "database": "test"
+ }
+ ],
+ "transformOptions": {}
+ },
+ {
+ "pluginId": "17244128298414uc",
+ "name": "mysql_sink_1",
+ "type": "SINK",
+ "connectorType": null,
+ "tableOption": {
+ "databases": [
+ "test"
+ ],
+ "tables": [
+ "test_table"
+ ]
+ },
+ "selectTableFields": {
+ "tableFields": [
+ "name",
+ "age"
+ ],
+ "all": true
+ },
+ "dataSourceId": 14717667385504,
+ "config":
"{\"query\":\"\",\"schema_save_mode\":\"CREATE_SCHEMA_WHEN_NOT_EXIST\",\"data_save_mode\":\"APPEND_DATA\",\"custom_sql\":\"\",\"connection_check_timeout_sec\":30,\"batch_size\":1000,\"is_exactly_once\":\"false\",\"xa_data_source_class_name\":\"\",\"max_commit_attempts\":3,\"transaction_timeout_sec\":-1,\"max_retries\":\"1\",\"auto_commit\":\"true\",\"support_upsert_by_query_primary_key_exist\":\"false\",\"primary_keys\":\"\",\"compatible_mode\":\"\",\"multi_table_sink_rep
[...]
+ "transformOptions": {}
+ }
+ ],
+ "jobDAG": {
+ "edges": [
+ {
+ "inputPluginId": "mysql_source_1",
+ "targetPluginId": "mysql_sink_1"
+ }
+ ]
+ }
+}
\ No newline at end of file