This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 70e526a2 [Feature][Seatunnel-web]Add support to configure placeholder
with default value in the job config. (#208)
70e526a2 is described below
commit 70e526a2d38a4fe1243dc0e9d74edeb4f3826941
Author: Mohammad Arshad <[email protected]>
AuthorDate: Mon Sep 9 07:55:19 2024 +0530
[Feature][Seatunnel-web]Add support to configure placeholder with default
value in the job config. (#208)
---
.../app/domain/request/job/JobExecParam.java | 6 +-
.../app/service/impl/JobExecutorServiceImpl.java | 2 +-
.../app/service/impl/JobInstanceServiceImpl.java | 14 +--
.../org/apache/seatunnel/app/utils/JobUtils.java | 74 ++++++-------
.../apache/seatunnel/app/utils/JobUtilsTests.java | 120 +++++++++++++++++++++
.../server/common/SeatunnelErrorEnum.java | 4 +-
.../app/controller/JobControllerWrapper.java | 15 ---
.../seatunnel/app/test/JobControllerTest.java | 45 +++-----
.../app/test/JobExecutorControllerTest.java | 77 +++----------
.../seatunnel/app/utils/JobTestingUtils.java | 41 +++++++
.../resources/jobs/fake_source_console_job.json | 52 ++++++++-
.../src/test/resources/logback-spring.xml | 2 +-
12 files changed, 276 insertions(+), 176 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
index 223429cc..3aa54ecd 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
@@ -27,10 +27,8 @@ import java.util.Map;
@AllArgsConstructor
// Job execution parameters
public class JobExecParam {
- // job name -> key -> value
- private Map<String, String> env;
- // task name -> key -> value
- private Map<String, Map<String, String>> tasks;
+ // job config placeholder name -> value
+ private Map<String, String> placeholderValues;
// task name -> new datasource id
private Map<String, String> datasource;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index ec1b37de..185222eb 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -80,7 +80,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
return Result.success(executeResource.getJobInstanceId());
} catch (RuntimeException e) {
Result<Long> failure =
-
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
+
Result.failure(SeatunnelErrorEnum.JOB_EXEC_SUBMISSION_ERROR, e.getMessage());
// Even though job execution submission failed, we still need to
return the
// jobInstanceId to the user
// as the job instance has been created in the database.
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index d58d19b4..70b1201e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -174,7 +174,6 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
BusinessMode businessMode =
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
- envConfig = JobUtils.updateEnvConfig(executeParam, envConfig);
JobUtils.updateDataSource(executeParam, tasks);
Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
@@ -230,8 +229,6 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
ParsingMode.SHARDING.name()));
}
- config =
- JobUtils.updateTaskConfig(executeParam,
config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
@@ -240,9 +237,6 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
businessMode,
config,
optionRule);
- mergeConfig =
- JobUtils.updateQueryTaskConfig(
- executeParam, mergeConfig,
task.getName());
sourceMap
.get(task.getConnectorType())
.add(filterEmptyValue(mergeConfig));
@@ -272,9 +266,6 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
}
List<TableSchemaReq> inputSchemas =
findInputSchemas(tasks, lines, task);
Config transformConfig = buildTransformConfig(task,
config, inputSchemas);
- transformConfig =
- JobUtils.updateTaskConfig(
- executeParam, transformConfig,
task.getName());
transformMap
.get(task.getConnectorType())
.add(filterEmptyValue(transformConfig));
@@ -289,8 +280,6 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
if (!sinkMap.containsKey(task.getConnectorType()))
{
sinkMap.put(task.getConnectorType(), new
ArrayList<>());
}
- config =
- JobUtils.updateTaskConfig(executeParam,
config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
@@ -341,7 +330,8 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
.setJson(false)
.setComments(false)
.setOriginComments(false));
- return SeaTunnelConfigUtil.generateConfig(env, sources, transforms,
sinks);
+ String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources,
transforms, sinks);
+ return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam);
}
@Override
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
index bfa36830..b51db986 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -16,20 +16,23 @@
*/
package org.apache.seatunnel.app.utils;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.apache.seatunnel.server.common.SeatunnelException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class JobUtils {
// The maximum length of the job execution error message, 4KB
private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
+ private static final Pattern placeholderPattern =
Pattern.compile("\\$\\{(\\w+)(?::(.*?))?\\}");
public static String getJobInstanceErrorMessage(String message) {
if (message == null) {
@@ -40,46 +43,6 @@ public class JobUtils {
: message;
}
- public static Config updateEnvConfig(JobExecParam jobExecParam, Config
envConfig) {
- if (jobExecParam == null || jobExecParam.getEnv() == null) {
- return envConfig;
- }
- return updateConfig(envConfig, jobExecParam.getEnv());
- }
-
- private static Config updateConfig(Config config, Map<String, String>
properties) {
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- config =
- config.withValue(
- entry.getKey(),
ConfigValueFactory.fromAnyRef(entry.getValue()));
- }
- return config;
- }
-
- public static Config updateTaskConfig(
- JobExecParam jobExecParam, Config taskConfig, String taskName) {
- if (jobExecParam == null
- || jobExecParam.getTasks() == null
- || jobExecParam.getTasks().get(taskName) == null) {
- return taskConfig;
- }
- return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName));
- }
-
- public static Config updateQueryTaskConfig(
- JobExecParam jobExecParam, Config taskConfig, String taskName) {
- if (jobExecParam == null
- || jobExecParam.getTasks() == null
- || jobExecParam.getTasks().get(taskName) == null) {
- return taskConfig;
- }
- String query = jobExecParam.getTasks().get(taskName).get("query");
- if (query != null) {
- return taskConfig.withValue("query",
ConfigValueFactory.fromAnyRef(query));
- }
- return taskConfig;
- }
-
public static void updateDataSource(JobExecParam jobExecParam,
List<JobTask> tasks) {
if (jobExecParam == null || jobExecParam.getDatasource() == null) {
return;
@@ -104,4 +67,29 @@ public class JobUtils {
|| JobStatus.CANCELED == jobStatus
|| JobStatus.FAILED == jobStatus;
}
+
+ // Replace placeholders in job config with actual values
+ public static String replaceJobConfigPlaceholders(
+ String jobConfigString, JobExecParam jobExecParam) {
+ Map<String, String> placeholderValues =
+ (jobExecParam != null && jobExecParam.getPlaceholderValues()
!= null)
+ ? jobExecParam.getPlaceholderValues()
+ : Collections.emptyMap();
+
+ Matcher matcher = placeholderPattern.matcher(jobConfigString);
+ StringBuffer result = new StringBuffer();
+
+ while (matcher.find()) {
+ String placeholderName = matcher.group(1);
+ String replacement =
placeholderValues.getOrDefault(placeholderName, matcher.group(2));
+ if (replacement == null) {
+ throw new SeatunnelException(
+ SeatunnelErrorEnum.JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER,
placeholderName);
+ }
+ matcher.appendReplacement(result, replacement);
+ }
+
+ matcher.appendTail(result);
+ return result.toString();
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java
b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java
new file mode 100644
index 00000000..31c37fb9
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.utils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.server.common.SeatunnelException;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class JobUtilsTests {
+
+ @Test
+ public void
testReplaceJobConfigPlaceholders_AllJobConfigPlaceholdersReplaced() {
+ String jobConfigContent =
+
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
+ Map<String, String> paramValues = new HashMap<>();
+ paramValues.put("jobModeParam", "STREAMING");
+ paramValues.put("jobNameParam", "newJob");
+ JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+ String expected =
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
+ String actual =
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void
testReplaceJobConfigPlaceholders_JobConfig_PlaceholdersRepeat() {
+ String jobConfigContent =
+
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobModeParam}";
+ Map<String, String> paramValues = new HashMap<>();
+ paramValues.put("jobModeParam", "STREAMING");
+ JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+ String expected =
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=STREAMING";
+ String actual =
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testReplaceJobConfigPlaceholdersUsed() {
+ String jobConfigContent =
+
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam:DefaultJob}";
+ Map<String, String> paramValues = new HashMap<>();
+ paramValues.put("jobModeParam", "STREAMING");
+ JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+ String expected =
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=DefaultJob";
+ String actual =
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void
testReplaceJobConfigPlaceholders_NoDefaultValueThrowsException() {
+ String jobConfigContent =
+
"job.mode=${jobModeParam}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
+ Map<String, String> paramValues = new HashMap<>();
+ paramValues.put("jobModeParam", "STREAMING");
+ JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+ assertThrows(
+ SeatunnelException.class,
+ () -> {
+ JobUtils.replaceJobConfigPlaceholders(jobConfigContent,
jobExecParam);
+ });
+ }
+
+ @Test
+ public void testReplaceJobConfigPlaceholders_NoJobConfigPlaceholders() {
+ String jobConfigContent =
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
+ Map<String, String> paramValues = new HashMap<>();
+ JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+ String expected =
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
+ String actual =
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testParseConfigWithPlaceHolders() {
+ String transformConfig =
+
"{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}";
+ Config config = ConfigFactory.parseString(transformConfig);
+ assertNotNull(config);
+ }
+
+ private static @NotNull JobExecParam getJobExecParam(Map<String, String>
paramValues) {
+ JobExecParam jobExecParam = new JobExecParam();
+ jobExecParam.setPlaceholderValues(paramValues);
+ return jobExecParam;
+ }
+}
diff --git
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
index dc70cb27..53dde9ad 100644
---
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
+++
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
@@ -82,9 +82,11 @@ public enum SeatunnelErrorEnum {
"load job state from engine error",
"load job statue from engine [%s] error, error msg is [%s]"),
UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s]
version [%s]"),
- JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
+ JOB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
LOAD_ENGINE_METRICS_ERROR(
40005, "load engine metrics error", "load engine metrics error.
error msg is [%s]"),
+ JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER(
+ 40006, "No value found for placeholder", "No value found for
placeholder: [%s]"),
JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid
error"),
/* datasource and virtual table */
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
index b899835f..be654fba 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
@@ -25,10 +25,6 @@ import org.apache.seatunnel.app.utils.JSONUtils;
import com.fasterxml.jackson.core.type.TypeReference;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
public class JobControllerWrapper extends SeatunnelWebTestingBase {
public Result<Long> createJob(JobCreateReq jobCreateRequest) {
@@ -48,15 +44,4 @@ public class JobControllerWrapper extends
SeatunnelWebTestingBase {
String response = sendRequest(urlWithParam("job/get/" + jobVersionId +
"?"), null, "GET");
return JSONTestUtils.parseObject(response, new
TypeReference<Result<JobRes>>() {});
}
-
- public JobCreateReq populateJobCreateReqFromFile() {
- String filePath =
"src/test/resources/jobs/fake_source_console_job.json";
- String jsonContent;
- try {
- jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
- }
}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
index c963324c..6b51a94c 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -20,7 +20,6 @@ import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
-import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
import org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobDAG;
@@ -45,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class JobControllerTest {
private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
- private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
private static JobControllerWrapper jobControllerWrapper;
private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
private static final String uniqueId = "_" + System.currentTimeMillis();
@@ -53,7 +51,6 @@ public class JobControllerTest {
@BeforeAll
public static void setUp() {
seaTunnelWebCluster.start();
- seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
jobControllerWrapper = new JobControllerWrapper();
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
}
@@ -61,11 +58,9 @@ public class JobControllerTest {
@Test
public void createJobWithSingleAPI_shouldExecuteSuccessfully() {
String jobName = "jobWithSingleAPI" + uniqueId;
- JobCreateReq jobCreateReq =
jobControllerWrapper.populateJobCreateReqFromFile();
- jobCreateReq.getJobConfig().setName(jobName);
- jobCreateReq.getJobConfig().setDescription(jobName + " description");
- setSourceIds(jobCreateReq, "fake_source_create" + uniqueId,
"console_create" + uniqueId);
-
+ JobCreateReq jobCreateReq =
+ JobTestingUtils.populateJobCreateReqFromFile(
+ jobName, "fake_source_create" + uniqueId,
"console_create" + uniqueId);
Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(job.isSuccess());
Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(job.getData());
@@ -79,25 +74,12 @@ public class JobControllerTest {
assertEquals(5, listResult.getData().get(0).getWriteRowCount());
}
- private void setSourceIds(
- JobCreateReq jobCreateReq, String fsdSourceName, String
csSourceName) {
- // Set the data source id for the plugin configs
- String fakeSourceDataSourceId =
-
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName);
- String consoleDataSourceId =
-
seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName);
- for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
- if (pluginConfig.getName().equals("source-fake-source")) {
-
pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId));
- } else if (pluginConfig.getName().equals("sink-console")) {
-
pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId));
- }
- }
- }
-
@Test
public void createJobWithSingleAPI_ValidateInput() {
- JobCreateReq jobCreateReq =
jobControllerWrapper.populateJobCreateReqFromFile();
+ String jobName = "jobWithSingleAPI2" + uniqueId;
+ JobCreateReq jobCreateReq =
+ JobTestingUtils.populateJobCreateReqFromFile(
+ jobName, "fake_source_create_2" + uniqueId,
"console_create_2" + uniqueId);
JobConfig jobConfig = jobCreateReq.getJobConfig();
jobConfig.setName("");
Result<Long> result = jobControllerWrapper.createJob(jobCreateReq);
@@ -105,7 +87,7 @@ public class JobControllerTest {
assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(),
result.getCode());
assertEquals("param [name] can not be null or empty", result.getMsg());
- String jobName = "jobValidation" + uniqueId;
+ jobName = "jobValidation" + uniqueId;
jobConfig.setName(jobName);
jobConfig.setDescription(null);
result = jobControllerWrapper.createJob(jobCreateReq);
@@ -129,7 +111,8 @@ public class JobControllerTest {
result.getMsg());
jobConfig.getEnv().put("job.mode", "BATCH");
- setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId,
"console_create2" + uniqueId);
+ // setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId,
"console_create2" +
+ // uniqueId);
result = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(result.isSuccess());
assertEquals(0, result.getCode());
@@ -139,11 +122,9 @@ public class JobControllerTest {
@Test
public void testUpdateJob_ForValidAndInvalidScenarios() {
String jobName = "updateJob_single_api" + uniqueId;
- JobCreateReq jobCreateReq =
jobControllerWrapper.populateJobCreateReqFromFile();
- jobCreateReq.getJobConfig().setName(jobName);
- jobCreateReq.getJobConfig().setDescription(jobName + " description");
- setSourceIds(
- jobCreateReq, "fake_source_update_job" + uniqueId,
"console_update_job" + uniqueId);
+ JobCreateReq jobCreateReq =
+ JobTestingUtils.populateJobCreateReqFromFile(
+ jobName, "fake_source_create_3" + uniqueId,
"console_create_3" + uniqueId);
Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(job.isSuccess());
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 6e1cb1ea..63488d11 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -81,7 +81,10 @@ public class JobExecutorControllerTest {
@Test
public void executeJobWithParameters() {
String jobName = "execJobWithParam" + uniqueId;
- long jobVersionId = JobTestingUtils.createJob(jobName);
+ JobCreateReq jobCreateReq =
+ JobTestingUtils.populateJobCreateReqFromFile(
+ jobName, "fake_source_exec-1" + uniqueId,
"console_exec-1" + uniqueId);
+ long jobVersionId = JobTestingUtils.createJob(jobCreateReq);
Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
assertTrue(result.isSuccess());
assertTrue(result.getData() > 0);
@@ -96,26 +99,19 @@ public class JobExecutorControllerTest {
assertTrue(generatedJobFile.contains("\"log.print.delay.ms\"=\"100\""));
JobExecParam jobExecParam = new JobExecParam();
+ Map<String, String> placeholderValues = new HashMap<>();
- Map<String, String> envConf = new HashMap<>();
- envConf.put("job.name", "executeJobWithParameters");
- jobExecParam.setEnv(envConf);
- Map<String, Map<String, String>> tasks = new HashMap<>();
- jobExecParam.setTasks(tasks);
-
+ // source configuration
int numberOfRecords = 100;
- Map<String, String> task1Config = new HashMap<>();
- task1Config.put("row.num", String.valueOf(numberOfRecords));
- tasks.put("source-fakesource", task1Config);
+ placeholderValues.put("rowNum", String.valueOf(numberOfRecords));
- Map<String, String> task2Config = new HashMap<>();
- task2Config.put("replace_first", "true");
- task2Config.put("is_regex", "true");
- tasks.put("transform-replace", task2Config);
+ // transform configuration
+ placeholderValues.put("firstReplace", "true");
+ placeholderValues.put("isRegex", "true");
- Map<String, String> task3Config = new HashMap<>();
- task3Config.put("log.print.delay.ms", "99");
- tasks.put("sink-console", task3Config);
+ // sink configuration
+ placeholderValues.put("logPrintDelayMs", "99");
+ jobExecParam.setPlaceholderValues(placeholderValues);
result = jobExecutorControllerWrapper.jobExecutor(jobVersionId,
jobExecParam);
assertTrue(result.isSuccess());
@@ -132,53 +128,6 @@ public class JobExecutorControllerTest {
assertTrue(generatedJobFile.contains("\"replace_first\"=\"true\""));
// database properties except query can not be updated
assertFalse(generatedJobFile.contains("\"log.print.delay.ms\"=\"99\""));
-
assertTrue(generatedJobFile.contains("\"job.name\"=executeJobWithParameters"));
- }
-
- @Test
- public void executeJobWithParameters_AllowQueryUpdate() {
- String jobName = "execJobUpdateQuery" + uniqueId;
- JobCreateReq jobCreateReq =
JobTestingUtils.populateMySQLJobCreateReqFromFile();
- jobCreateReq.getJobConfig().setName(jobName);
- jobCreateReq.getJobConfig().setDescription(jobName + " description");
- String datasourceName = "execJobUpdateQuery_db" + uniqueId;
- String mysqlDatasourceId =
-
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
- for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
- pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
- }
- Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
- assertTrue(job.isSuccess());
- Long jobVersionId = job.getData();
- Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
- // Fails because of the wrong credentials of the database.
- assertFalse(result.isSuccess());
- String generatedJobFile =
getGenerateJobFile(String.valueOf(jobVersionId));
- assertTrue(
- generatedJobFile.contains(
- "query=\"SELECT `name`, `age` FROM
`test`.`test_table`\""));
- assertTrue(generatedJobFile.contains("user=someUser"));
- assertTrue(generatedJobFile.contains("password=somePassword"));
-
- JobExecParam jobExecParam = new JobExecParam();
- Map<String, Map<String, String>> tasks = new HashMap<>();
- jobExecParam.setTasks(tasks);
-
- Map<String, String> task1Config = new HashMap<>();
- task1Config.put("query", "SELECT `name`, `age` FROM
`test`.`test_table` LIMIT 10");
- task1Config.put("user", "otherUser");
- task1Config.put("password", "otherPassword");
- tasks.put("mysql_source_1", task1Config);
-
- result = jobExecutorControllerWrapper.jobExecutor(jobVersionId,
jobExecParam);
- assertFalse(result.isSuccess());
- // query should be changed but other database details should not be
changed,
- generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
- assertTrue(
- generatedJobFile.contains(
- "query=\"SELECT `name`, `age` FROM `test`.`test_table`
LIMIT 10\""));
- assertTrue(generatedJobFile.contains("user=someUser"));
- assertTrue(generatedJobFile.contains("password=somePassword"));
}
@Test
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
index f4d6c525..e9dc309f 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
@@ -18,6 +18,7 @@ package org.apache.seatunnel.app.utils;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.controller.JobConfigControllerWrapper;
+import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper;
import org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
@@ -26,6 +27,7 @@ import org.apache.seatunnel.app.domain.request.job.Edge;
import org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
@@ -49,6 +51,7 @@ public class JobTestingUtils {
new JobTaskControllerWrapper();
private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper =
new SeatunnelDatasourceControllerWrapper();
+ private static JobControllerWrapper jobControllerWrapper = new
JobControllerWrapper();
private static final long TIMEOUT = 60; // 1 minute
private static final long INTERVAL = 2; // 1 second
@@ -166,4 +169,42 @@ public class JobTestingUtils {
}
return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
}
+
+ public static JobCreateReq populateJobCreateReqFromFile(
+ String jobName, String fsdSourceName, String csSourceName) {
+ String filePath =
"src/test/resources/jobs/fake_source_console_job.json";
+ String jsonContent;
+ try {
+ jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ JobCreateReq jobCreateReq = JSONTestUtils.parseObject(jsonContent,
JobCreateReq.class);
+ jobCreateReq.getJobConfig().setName(jobName);
+ jobCreateReq.getJobConfig().setDescription(jobName + " description");
+ setSourceIds(jobCreateReq, fsdSourceName, csSourceName);
+ return jobCreateReq;
+ }
+
+ private static void setSourceIds(
+ JobCreateReq jobCreateReq, String fsdSourceName, String
csSourceName) {
+ // Set the data source id for the plugin configs
+ String fakeSourceDataSourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName);
+ String consoleDataSourceId =
+
seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName);
+ for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+ if (pluginConfig.getName().equals("source-fake-source")) {
+
pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId));
+ } else if (pluginConfig.getName().equals("sink-console")) {
+
pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId));
+ }
+ }
+ }
+
+ public static Long createJob(JobCreateReq jobCreateReq) {
+ Result<Long> jobCreateResult =
jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(jobCreateResult.isSuccess());
+ return jobCreateResult.getData();
+ }
}
diff --git
a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
index acab2f66..324c1d9b 100644
--- a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
+++ b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
@@ -5,7 +5,7 @@
"engine": "SeaTunnel",
"env": {
"job.mode": "BATCH",
- "job.name": "SeaTunnel_Job",
+ "job.name": "${jonName:SeaTunnel_Job}",
"jars": "",
"checkpoint.interval": "",
"checkpoint.timeout": "",
@@ -36,7 +36,7 @@
},
"dataSourceId": 1,
"sceneMode": "SINGLE_TABLE",
- "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields
{\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl
[...]
+ "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields
{\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl
[...]
"outputSchema": [
{
"fields": [
@@ -69,6 +69,48 @@
],
"transformOptions": {}
},
+ {
+ "name": "transform-replace",
+ "type": "TRANSFORM",
+ "connectorType": "Replace",
+ "selectTableFields": {
+ "tableFields": [],
+ "all": false
+ },
+ "sceneMode": "SINGLE_TABLE",
+ "config":
"{\"query\":\"\",\"replace_field\":\"name\",\"pattern\":\"OK\",\"replacement\":\"ITS
OK.\",\"is_regex\":\"${isRegex:false}\",\"replace_first\":\"${firstReplace:false}\"}",
+ "outputSchema": [
+ {
+ "fields": [
+ {
+ "type": "string",
+ "name": "name",
+ "comment": "",
+ "primaryKey": true,
+ "defaultValue": null,
+ "nullable": false,
+ "properties": null,
+ "unSupport": false,
+ "outputDataType": "STRING"
+ },
+ {
+ "type": "int",
+ "name": "age",
+ "comment": "",
+ "primaryKey": false,
+ "defaultValue": null,
+ "nullable": false,
+ "properties": null,
+ "unSupport": false,
+ "outputDataType": "INT"
+ }
+ ],
+ "tableName": "fake_table",
+ "database": "fake_database"
+ }
+ ],
+ "transformOptions": {}
+ },
{
"name": "sink-console",
"type": "SINK",
@@ -86,7 +128,7 @@
"all": false
},
"dataSourceId": 2,
- "config": "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}",
+ "config":
"{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}",
"transformOptions": {}
}
],
@@ -94,6 +136,10 @@
"edges": [
{
"inputPluginId": "source-fake-source",
+ "targetPluginId": "transform-replace"
+ },
+ {
+ "inputPluginId": "transform-replace",
"targetPluginId": "sink-console"
}
]
diff --git a/seatunnel-web-it/src/test/resources/logback-spring.xml
b/seatunnel-web-it/src/test/resources/logback-spring.xml
index 145a239d..74ccf4cd 100644
--- a/seatunnel-web-it/src/test/resources/logback-spring.xml
+++ b/seatunnel-web-it/src/test/resources/logback-spring.xml
@@ -41,7 +41,7 @@
</encoder>
</appender>
- <root level="INFO">
+ <root level="DEBUG">
<appender-ref ref="seatunnel-web" />
<appender-ref ref="console" />
</root>