This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e2a4b64148 [Feature][starter] support user define parameter on
spark/flink engine (#6387)
e2a4b64148 is described below
commit e2a4b6414887684c2ad35b9ed6bbf353d25a34ec
Author: Jarvis <[email protected]>
AuthorDate: Tue Apr 30 09:12:14 2024 +0800
[Feature][starter] support user define parameter on spark/flink engine
(#6387)
---
docs/en/concept/config.md | 110 +++++++++++++++++++++
.../core/starter/command/ConfEncryptCommand.java | 17 +++-
.../core/starter/utils/ConfigBuilder.java | 53 +++++++---
.../core/starter/utils/ConfigShadeTest.java | 50 ++++++++++
.../src/test/resources/config.variables.conf | 68 +++++++++++++
.../seatunnel/core/starter/flink/FlinkStarter.java | 2 +-
.../seatunnel/core/starter/flink/FlinkStarter.java | 2 +-
.../flink/command/FlinkTaskExecuteCommand.java | 2 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 42 ++------
.../core/starter/spark/SparkStarterTest.java | 2 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 42 ++------
.../spark/command/SparkTaskExecuteCommand.java | 2 +-
.../starter/seatunnel/args/ClientCommandArgs.java | 12 ---
.../seatunnel/command/ClientExecuteCommand.java | 6 +-
.../seatunnel/args/ClientCommandArgsTest.java | 107 --------------------
.../common/container/AbstractTestContainer.java | 16 ++-
.../e2e/common/container/TestContainer.java | 4 +
.../flink/AbstractTestFlinkContainer.java | 8 +-
.../ConnectorPackageServiceContainer.java | 8 +-
.../container/seatunnel/SeaTunnelContainer.java | 8 +-
.../spark/AbstractTestSparkContainer.java | 9 +-
.../seatunnel/engine/e2e/UserVariableIT.java | 47 +++++++++
.../test/resources/fake_to_console.variables.conf | 62 ++++++++++++
.../seatunnel/engine/client/SeaTunnelClient.java | 24 ++++-
.../engine/client/SeaTunnelClientInstance.java | 15 +++
.../client/job/ClientJobExecutionEnvironment.java | 21 +++-
.../core/parse/MultipleTableJobConfigParser.java | 18 +++-
27 files changed, 539 insertions(+), 218 deletions(-)
diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md
index c5f549ef98..aea170f79f 100644
--- a/docs/en/concept/config.md
+++ b/docs/en/concept/config.md
@@ -206,6 +206,116 @@ configured with these two parameters, because in
SeaTunnel, there is a default c
parameters are not configured, then the generated data from the last module of
the previous node will be used.
This is much more convenient when there is only one source.
+## Config variable substitution
+
+In config file we can define some variables and replace it in run time. **This
is only support `hocon` format file**.
+
+```hocon
+env {
+ job.mode = "BATCH"
+ job.name = ${jobName}
+ parallelism = 2
+}
+
+source {
+ FakeSource {
+ result_table_name = ${resName}
+ row.num = ${rowNum}
+ string.template = ${strTemplate}
+ int.template = [20, 21]
+ schema = {
+ fields {
+ name = ${nameType}
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ source_table_name = "fake"
+ result_table_name = "sql"
+ query = "select * from "${resName}" where name = '"${nameVal}"' "
+ }
+
+}
+
+sink {
+ Console {
+ source_table_name = "sql"
+ username = ${username}
+ password = ${password}
+ blankSpace = ${blankSpace}
+ }
+}
+
+```
+
+In the above config, we define some variables, like `${rowNum}`, `${resName}`.
+We can replace those parameters with this shell command:
+
+```shell
+./bin/seatunnel.sh -c <this_config_file>
+-i jobName='st var job'
+-i resName=fake
+-i rowNum=10
+-i strTemplate=['abc','d~f','h i']
+-i nameType=string
+-i nameVal=abc
+-i username=seatunnel=2.3.1
+-i password='$a^b%c.d~e0*9('
+-i blankSpace='2023-12-26 11:30:00'
+-e local
+```
+
+Then the final submitted config is:
+
+```hocon
+env {
+ job.mode = "BATCH"
+ job.name = "st var job"
+ parallelism = 2
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 10
+ string.template = ["abc","d~f","h i"]
+ int.template = [20, 21]
+ schema = {
+ fields {
+ name = string
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ source_table_name = "fake"
+ result_table_name = "sql"
+ query = "select * from fake where name = 'abc' "
+ }
+
+}
+
+sink {
+ Console {
+ source_table_name = "sql"
+ username = "seatunnel=2.3.1"
+ password = "$a^b%c.d~e0*9("
+ blankSpace = "2023-12-26 11:30:00"
+ }
+}
+```
+
+Some Notes:
+- quota with `'` if the value has space ` ` or special character (like `(`)
+- if the replacement variables is in `"` or `'`, like `resName` and `nameVal`,
you need add `"`
+
## What's More
If you want to know the details of this format configuration, Please
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
index 3fef617568..f94b235599 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
@@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Objects;
import static
org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
@@ -53,10 +54,18 @@ public class ConfEncryptCommand implements
Command<AbstractCommandArgs> {
checkConfigExist(configPath);
Config config =
ConfigFactory.parseFile(configPath.toFile())
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ if (abstractCommandArgs.getVariables() != null) {
+ abstractCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(variable -> variable.split("=", 2))
+ .filter(pair -> pair.length == 2)
+ .forEach(pair -> System.setProperty(pair[0], pair[1]));
+ config =
+ config.resolveWith(
+ ConfigFactory.systemProperties(),
+
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ }
Config encryptConfig = ConfigShadeUtils.encryptConfig(config);
log.info(
"Encrypt config: \n{}",
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index 0859690feb..c667a0431e 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -19,8 +19,10 @@ package org.apache.seatunnel.core.starter.utils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.impl.Parseable;
import org.apache.seatunnel.api.configuration.ConfigAdapter;
@@ -29,7 +31,9 @@ import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
/** Used to build the {@link Config} from config file. */
@@ -43,14 +47,11 @@ public class ConfigBuilder {
// utility class and cannot be instantiated
}
- private static Config ofInner(@NonNull Path filePath) {
+ private static Config ofInner(@NonNull Path filePath, List<String>
variables) {
Config config =
ConfigFactory.parseFile(filePath.toFile())
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
- return ConfigShadeUtils.decryptConfig(config);
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ return ConfigShadeUtils.decryptConfig(backfillUserVariables(config,
variables));
}
public static Config of(@NonNull String filePath) {
@@ -58,13 +59,22 @@ public class ConfigBuilder {
return of(path);
}
+ public static Config of(@NonNull String filePath, List<String> variables) {
+ Path path = Paths.get(filePath);
+ return of(path, variables);
+ }
+
public static Config of(@NonNull Path filePath) {
+ return of(filePath, null);
+ }
+
+ public static Config of(@NonNull Path filePath, List<String> variables) {
log.info("Loading config file from path: {}", filePath);
Optional<ConfigAdapter> adapterSupplier =
ConfigAdapterUtils.selectAdapter(filePath);
Config config =
adapterSupplier
- .map(adapter -> of(adapter, filePath))
- .orElseGet(() -> ofInner(filePath));
+ .map(adapter -> of(adapter, filePath, variables))
+ .orElseGet(() -> ofInner(filePath, variables));
log.info("Parsed config file: \n{}",
config.root().render(CONFIG_RENDER_OPTIONS));
return config;
}
@@ -88,17 +98,38 @@ public class ConfigBuilder {
return config;
}
- public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull
Path filePath) {
+ public static Config of(
+ @NonNull ConfigAdapter configAdapter, @NonNull Path filePath,
List<String> variables) {
log.info("With config adapter spi {}",
configAdapter.getClass().getName());
try {
Map<String, Object> flattenedMap =
configAdapter.loadConfig(filePath);
Config config = ConfigFactory.parseMap(flattenedMap);
- return ConfigShadeUtils.decryptConfig(config);
+ return
ConfigShadeUtils.decryptConfig(backfillUserVariables(config, variables));
} catch (Exception warn) {
log.warn(
"Loading config failed with spi {}, fallback to HOCON
loader.",
configAdapter.getClass().getName());
- return ofInner(filePath);
+ return ofInner(filePath, variables);
+ }
+ }
+
+ private static Config backfillUserVariables(Config config, List<String>
variables) {
+ if (variables != null) {
+ variables.stream()
+ .filter(Objects::nonNull)
+ .map(variable -> variable.split("=", 2))
+ .filter(pair -> pair.length == 2)
+ .forEach(pair -> System.setProperty(pair[0], pair[1]));
+ Config systemConfig =
+ Parseable.newProperties(
+ System.getProperties(),
+ ConfigParseOptions.defaults()
+ .setOriginDescription("system
properties"))
+ .parse()
+ .toConfig();
+ return config.resolveWith(
+ systemConfig,
ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
+ return config;
}
}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
index e25781b910..9382c68663 100644
---
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
+++
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.utils;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
import org.apache.seatunnel.api.configuration.ConfigShade;
import org.apache.seatunnel.common.utils.JsonUtils;
@@ -67,6 +68,55 @@ public class ConfigShadeTest {
config.getConfigList("source").get(0).getString("password"),
PASSWORD);
}
+ @Test
+ public void testVariableReplacement() throws URISyntaxException {
+ String jobName = "seatunnel variable test job";
+ String resName = "fake";
+ int rowNum = 10;
+ String nameType = "string";
+ String username = "seatunnel=2.3.1";
+ String password = "$a^b%c.d~e0*9(";
+ String blankSpace = "2023-12-26 11:30:00";
+ List<String> variables = new ArrayList<>();
+ variables.add("jobName=" + jobName);
+ variables.add("resName=" + resName);
+ variables.add("rowNum=" + rowNum);
+ variables.add("strTemplate=[abc,de~,f h]");
+ variables.add("nameType=" + nameType);
+ variables.add("nameVal=abc");
+ variables.add("username=" + username);
+ variables.add("password=" + password);
+ variables.add("blankSpace=" + blankSpace);
+ URL resource =
ConfigShadeTest.class.getResource("/config.variables.conf");
+ Assertions.assertNotNull(resource);
+ Config config = ConfigBuilder.of(Paths.get(resource.toURI()),
variables);
+ Config envConfig = config.getConfig("env");
+ Assertions.assertEquals(envConfig.getString("job.name"), jobName);
+ List<? extends ConfigObject> sourceConfigs =
config.getObjectList("source");
+ for (ConfigObject configObject : sourceConfigs) {
+ Config sourceConfig = configObject.toConfig();
+ List<String> list1 = sourceConfig.getStringList("string.template");
+ Assertions.assertEquals(list1.get(0), "abc");
+ Assertions.assertEquals(list1.get(1), "de~");
+ Assertions.assertEquals(list1.get(2), "f h");
+ Assertions.assertEquals(sourceConfig.getInt("row.num"), rowNum);
+
Assertions.assertEquals(sourceConfig.getString("result_table_name"), resName);
+ }
+ List<? extends ConfigObject> transformConfigs =
config.getObjectList("transform");
+ for (ConfigObject configObject : transformConfigs) {
+ Config transformConfig = configObject.toConfig();
+ Assertions.assertEquals(
+ transformConfig.getString("query"), "select * from fake
where name = 'abc' ");
+ }
+ List<? extends ConfigObject> sinkConfigs =
config.getObjectList("sink");
+ for (ConfigObject sinkObject : sinkConfigs) {
+ Config sinkConfig = sinkObject.toConfig();
+ Assertions.assertEquals(sinkConfig.getString("username"),
username);
+ Assertions.assertEquals(sinkConfig.getString("password"),
password);
+ Assertions.assertEquals(sinkConfig.getString("blankSpace"),
blankSpace);
+ }
+ }
+
@Test
public void testDecryptAndEncrypt() {
String encryptUsername = ConfigShadeUtils.encryptOption("base64",
USERNAME);
diff --git
a/seatunnel-core/seatunnel-core-starter/src/test/resources/config.variables.conf
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config.variables.conf
new file mode 100644
index 0000000000..7355dcc29a
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config.variables.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+ job.name = ${jobName}
+ parallelism = 2
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = ${resName}
+ row.num = ${rowNum}
+ string.template = ${strTemplate}
+ int.template = [20, 21]
+ schema = {
+ fields {
+ name = ${nameType}
+ age = "int"
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform-v2
+ sql {
+ source_table_name = "fake"
+ result_table_name = "sql"
+ query = "select * from "${resName}" where name = '"${nameVal}"' "
+ }
+
+}
+
+sink {
+ Console {
+ source_table_name = "sql"
+ username = ${username}
+ password = ${password}
+ blankSpace = ${blankSpace}
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 5dc1d32cef..c244f2ff33 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -90,7 +90,7 @@ public class FlinkStarter implements Starter {
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
+ .forEach(variable -> command.add("-i " + variable));
return command;
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 7373cb58ed..e74bbd402f 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -90,7 +90,7 @@ public class FlinkStarter implements Starter {
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
+ .forEach(variable -> command.add("-i " + variable));
return command;
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
index f8539af752..e831fb081b 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
@@ -48,7 +48,7 @@ public class FlinkTaskExecuteCommand implements
Command<FlinkCommandArgs> {
public void execute() throws CommandExecuteException {
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
+ Config config = ConfigBuilder.of(configFile,
flinkCommandArgs.getVariables());
// if user specified job name using command line arguments, override
config option
if (!flinkCommandArgs.getJobName().equals(Constants.LOGO)) {
config =
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 1b8918976b..f2c20e8408 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -18,8 +18,6 @@
package org.apache.seatunnel.core.starter.spark;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
@@ -129,40 +127,12 @@ public class SparkStarter implements Starter {
/** parse spark configurations from SeaTunnel config file */
private void setSparkConf() throws FileNotFoundException {
- commandArgs.getVariables().stream()
- .filter(Objects::nonNull)
- .map(variable -> variable.split("=", 2))
- .filter(pair -> pair.length == 2)
- .forEach(pair -> System.setProperty(pair[0], pair[1]));
- this.sparkConf = getSparkConf(commandArgs.getConfigFile());
- String driverJavaOpts =
this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
- String executorJavaOpts =
- this.sparkConf.getOrDefault("spark.executor.extraJavaOptions",
"");
- if (!commandArgs.getVariables().isEmpty()) {
- String properties =
- commandArgs.getVariables().stream()
- .map(v -> "-D" + v)
- .collect(Collectors.joining(" "));
- driverJavaOpts += " " + properties;
- executorJavaOpts += " " + properties;
- this.sparkConf.put("spark.driver.extraJavaOptions",
driverJavaOpts.trim());
- this.sparkConf.put("spark.executor.extraJavaOptions",
executorJavaOpts.trim());
- }
+ this.sparkConf = getSparkConf(commandArgs.getConfigFile(),
commandArgs.getVariables());
}
/** Get spark configurations from SeaTunnel job config file. */
- static Map<String, String> getSparkConf(String configFile) throws
FileNotFoundException {
- File file = new File(configFile);
- if (!file.exists()) {
- throw new FileNotFoundException("config file '" + file + "' does
not exists!");
- }
- Config appConfig =
- ConfigFactory.parseFile(file)
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-
+ static Map<String, String> getSparkConf(String configFile, List<String>
variables) {
+ Config appConfig = ConfigBuilder.of(configFile, variables);
return appConfig.getConfig("env").entrySet().stream()
.collect(
Collectors.toMap(
@@ -175,7 +145,7 @@ public class SparkStarter implements Starter {
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir))
{
return Collections.emptyList();
}
- Config config = ConfigBuilder.of(commandArgs.getConfigFile());
+ Config config = ConfigBuilder.of(commandArgs.getConfigFile(),
commandArgs.getVariables());
Set<URL> pluginJars = new HashSet<>();
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery();
@@ -217,6 +187,10 @@ public class SparkStarter implements Starter {
if (this.commandArgs.isCheckConfig()) {
commands.add("--check");
}
+ this.commandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> commands.add("-i " + variable));
return commands;
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
index 41bff86d30..7189fb8a60 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
@@ -33,7 +33,7 @@ public class SparkStarterTest {
public void testGetSparkConf() throws URISyntaxException,
FileNotFoundException {
URI uri =
ClassLoader.getSystemResource("spark_application.conf").toURI();
String file = new File(uri).toString();
- Map<String, String> sparkConf = SparkStarter.getSparkConf(file);
+ Map<String, String> sparkConf = SparkStarter.getSparkConf(file, null);
assertEquals("SeaTunnel", sparkConf.get("job.name"));
assertEquals("1", sparkConf.get("spark.executor.cores"));
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index c33544873a..790a20191d 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -18,8 +18,6 @@
package org.apache.seatunnel.core.starter.spark;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
@@ -129,40 +127,12 @@ public class SparkStarter implements Starter {
/** parse spark configurations from SeaTunnel config file */
private void setSparkConf() throws FileNotFoundException {
- commandArgs.getVariables().stream()
- .filter(Objects::nonNull)
- .map(variable -> variable.split("=", 2))
- .filter(pair -> pair.length == 2)
- .forEach(pair -> System.setProperty(pair[0], pair[1]));
- this.sparkConf = getSparkConf(commandArgs.getConfigFile());
- String driverJavaOpts =
this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
- String executorJavaOpts =
- this.sparkConf.getOrDefault("spark.executor.extraJavaOptions",
"");
- if (!commandArgs.getVariables().isEmpty()) {
- String properties =
- commandArgs.getVariables().stream()
- .map(v -> "-D" + v)
- .collect(Collectors.joining(" "));
- driverJavaOpts += " " + properties;
- executorJavaOpts += " " + properties;
- this.sparkConf.put("spark.driver.extraJavaOptions",
driverJavaOpts.trim());
- this.sparkConf.put("spark.executor.extraJavaOptions",
executorJavaOpts.trim());
- }
+ this.sparkConf = getSparkConf(commandArgs.getConfigFile(),
commandArgs.getVariables());
}
/** Get spark configurations from SeaTunnel job config file. */
- static Map<String, String> getSparkConf(String configFile) throws
FileNotFoundException {
- File file = new File(configFile);
- if (!file.exists()) {
- throw new FileNotFoundException("config file '" + file + "' does
not exists!");
- }
- Config appConfig =
- ConfigFactory.parseFile(file)
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-
+ static Map<String, String> getSparkConf(String configFile, List<String>
variables) {
+ Config appConfig = ConfigBuilder.of(configFile, variables);
return appConfig.getConfig("env").entrySet().stream()
.collect(
Collectors.toMap(
@@ -175,7 +145,7 @@ public class SparkStarter implements Starter {
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir))
{
return Collections.emptyList();
}
- Config config = ConfigBuilder.of(commandArgs.getConfigFile());
+ Config config = ConfigBuilder.of(commandArgs.getConfigFile(),
commandArgs.getVariables());
Set<URL> pluginJars = new HashSet<>();
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery();
@@ -217,6 +187,10 @@ public class SparkStarter implements Starter {
if (this.commandArgs.isCheckConfig()) {
commands.add("--check");
}
+ this.commandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> commands.add("-i " + variable));
return commands;
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
index 1f5f4242c5..ea36bc0777 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
@@ -48,7 +48,7 @@ public class SparkTaskExecuteCommand implements
Command<SparkCommandArgs> {
public void execute() throws CommandExecuteException {
Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
+ Config config = ConfigBuilder.of(configFile,
sparkCommandArgs.getVariables());
if (!sparkCommandArgs.getJobName().equals(Constants.LOGO)) {
config =
config.withValue(
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index eefcc1a0a6..9504c9cb2c 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -37,7 +37,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
@EqualsAndHashCode(callSuper = true)
@Data
@@ -103,7 +102,6 @@ public class ClientCommandArgs extends AbstractCommandArgs {
@Override
public Command<?> buildCommand() {
Common.setDeployMode(getDeployMode());
- userParamsToSysEnv();
if (checkConfig) {
return new SeaTunnelConfValidateCommand(this);
}
@@ -116,16 +114,6 @@ public class ClientCommandArgs extends AbstractCommandArgs
{
return new ClientExecuteCommand(this);
}
- private void userParamsToSysEnv() {
- if (!this.variables.isEmpty()) {
- variables.stream()
- .filter(Objects::nonNull)
- .map(variable -> variable.split("=", 2))
- .filter(pair -> pair.length == 2)
- .forEach(pair -> System.setProperty(pair[0], pair[1]));
- }
- }
-
public DeployMode getDeployMode() {
return DeployMode.CLIENT;
}
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 980832f4de..d1e8b78009 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -134,13 +134,17 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
jobExecutionEnv =
engineClient.restoreExecutionContext(
configFile.toString(),
+ clientCommandArgs.getVariables(),
jobConfig,
seaTunnelConfig,
Long.parseLong(clientCommandArgs.getRestoreJobId()));
} else {
jobExecutionEnv =
engineClient.createExecutionContext(
- configFile.toString(), jobConfig,
seaTunnelConfig);
+ configFile.toString(),
+ clientCommandArgs.getVariables(),
+ jobConfig,
+ seaTunnelConfig);
}
// get job start time
diff --git
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
deleted file mode 100644
index 19fc639700..0000000000
---
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.seatunnel.args;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-
-import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
-
-public class ClientCommandArgsTest {
- @Test
- public void testUserDefinedParamsCommand() throws URISyntaxException {
- int fakeParallelism = 16;
- String username = "seatunnel=2.3.1";
- String password = "dsjr42=4wfskahdsd=w1chh";
- String fakeSourceTable = "fake";
- String fakeSinkTable = "sink";
- String list = "[par1=20230829,par2=20230829]";
- String blankSpace = "2023-12-26 11:30:00";
- String[] args = {
- "-c",
- "/args/user_defined_params.conf",
- "-e",
- "local",
- "-i",
- "fake_source_table=" + fakeSourceTable,
- "-i",
- "fake_parallelism=" + fakeParallelism,
- "-i",
- "fake_sink_table=" + fakeSinkTable,
- "-i",
- "password=" + password,
- "-i",
- "username=" + username,
- "-i",
- "blankSpace=" + blankSpace,
- "-i",
- "list=" + list,
- "-i",
- "sql=" + "\"select a , b from fake_source_table\""
- };
- ClientCommandArgs clientCommandArgs =
- CommandLineUtils.parse(args, new ClientCommandArgs(),
"seatunnel-zeta", true);
- clientCommandArgs.buildCommand();
- URL resource =
ClientCommandArgsTest.class.getResource("/args/user_defined_params.conf");
-
- Config config =
- ConfigFactory.parseFile(Paths.get(resource.toURI()).toFile())
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
- List<? extends ConfigObject> sourceConfigs =
config.getObjectList("source");
- for (ConfigObject configObject : sourceConfigs) {
- Config sourceConfig = configObject.toConfig();
-
- String tableName = sourceConfig.getString("result_table_name");
- Assertions.assertEquals(tableName, fakeSourceTable);
-
- int parallelism =
Integer.parseInt(sourceConfig.getString("parallelism"));
- Assertions.assertEquals(fakeParallelism, parallelism);
-
- Assertions.assertEquals(sourceConfig.getString("username"),
username);
- Assertions.assertEquals(sourceConfig.getString("password"),
password);
- }
- List<? extends ConfigObject> sinkConfigs =
config.getObjectList("sink");
- for (ConfigObject sinkObject : sinkConfigs) {
- Config sinkConfig = sinkObject.toConfig();
- String tableName = sinkConfig.getString("result_table_name");
- Assertions.assertEquals(tableName, fakeSinkTable);
-
- Assertions.assertEquals(sinkConfig.getString("username"),
username);
- Assertions.assertEquals(sinkConfig.getString("password"),
password);
- List<String> list1 = sinkConfig.getStringList("list");
- Assertions.assertEquals(list1.get(0), "par1=20230829");
- Assertions.assertEquals(list1.get(1), "par2=20230829");
- String sql = sinkConfig.getString("sql");
- Assertions.assertEquals(sql, "\"select a , b from
fake_source_table\"");
- Assertions.assertEquals(sinkConfig.getString("blankSpace"),
blankSpace);
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index 47e898f5ca..b033144d58 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -95,6 +95,12 @@ public abstract class AbstractTestContainer implements
TestContainer {
protected Container.ExecResult executeJob(GenericContainer<?> container,
String confFile)
throws IOException, InterruptedException {
+ return executeJob(container, confFile, null);
+ }
+
+ protected Container.ExecResult executeJob(
+ GenericContainer<?> container, String confFile, List<String>
variables)
+ throws IOException, InterruptedException {
final String confInContainerPath =
copyConfigFileToContainer(container, confFile);
// copy connectors
copyConnectorJarToContainer(
@@ -110,7 +116,15 @@ public abstract class AbstractTestContainer implements
TestContainer {
command.add(adaptPathForWin(binPath));
command.add("--config");
command.add(adaptPathForWin(confInContainerPath));
- command.addAll(getExtraStartShellCommands());
+ List<String> extraStartShellCommands = new
ArrayList<>(getExtraStartShellCommands());
+ if (variables != null && !variables.isEmpty()) {
+ variables.forEach(
+ v -> {
+ extraStartShellCommands.add("-i");
+ extraStartShellCommands.add(v);
+ });
+ }
+ command.addAll(extraStartShellCommands);
return executeCommand(container, command);
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index e277a7a822..33b196eeba 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -23,6 +23,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
import java.io.IOException;
+import java.util.List;
public interface TestContainer extends TestResource {
@@ -35,6 +36,9 @@ public interface TestContainer extends TestResource {
Container.ExecResult executeJob(String confFile) throws IOException,
InterruptedException;
+ Container.ExecResult executeJob(String confFile, List<String> variables)
+ throws IOException, InterruptedException;
+
default Container.ExecResult executeConnectorCheck(String[] args)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 91320b3d9a..7145da6242 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -149,8 +149,14 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
+ return executeJob(confFile, null);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile, List<String>
variables)
+ throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(jobManager, confFile);
+ return executeJob(jobManager, confFile, variables);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index 4e30d2c92b..4f5ea99029 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -220,8 +220,14 @@ public class ConnectorPackageServiceContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
+ return executeJob(confFile, null);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile, List<String>
variables)
+ throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(server1, confFile);
+ return executeJob(server1, confFile, variables);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 37af5011ef..f9a139b040 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -267,10 +267,16 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
+ return executeJob(confFile, null);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile, List<String>
variables)
+ throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
List<String> beforeThreads = ContainerUtil.getJVMThreadNames(server);
runningCount.incrementAndGet();
- Container.ExecResult result = executeJob(server, confFile);
+ Container.ExecResult result = executeJob(server, confFile, variables);
if (runningCount.decrementAndGet() > 0) {
// only check thread when job all finished.
return result;
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index bd3c1aeebc..fe07d082af 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -101,10 +101,17 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
extendedFactory.extend(master);
}
+ @Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
+ return executeJob(confFile, null);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile, List<String>
variables)
+ throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(master, confFile);
+ return executeJob(master, confFile, variables);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
new file mode 100644
index 0000000000..03455af2b0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UserVariableIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void userVariableTest(TestContainer container) throws IOException,
InterruptedException {
+ List<String> variables = new ArrayList<>();
+ String list = "[abc,def]";
+ variables.add("resName=fake");
+ variables.add("rowNum=10");
+ variables.add("strTemplate=" + list);
+ variables.add("nameType=string");
+ variables.add("nameVal=abc");
+ variables.add("sourceTableName=sql");
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_console.variables.conf",
variables);
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf
new file mode 100644
index 0000000000..48f7ec548b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 2
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = ${resName}
+ row.num = ${rowNum}
+ string.template = ${strTemplate}
+ schema = {
+ fields {
+ name = ${nameType}
+ age = "int"
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform-v2
+ sql {
+ source_table_name = "fake"
+ query = "select * from "${resName}" where name = '"${nameVal}"' "
+ result_table_name = "sql"
+ }
+}
+
+sink {
+ Console {
+ source_table_name = ${sourceTableName}
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 22aa0ffd13..2d7508ee2d 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -36,6 +36,7 @@ import lombok.NonNull;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -53,18 +54,37 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance, AutoCloseable {
@NonNull String filePath,
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig) {
+ return createExecutionContext(filePath, null, jobConfig,
seaTunnelConfig);
+ }
+
+ @Override
+ public ClientJobExecutionEnvironment createExecutionContext(
+ @NonNull String filePath,
+ List<String> variables,
+ @NonNull JobConfig jobConfig,
+ @NonNull SeaTunnelConfig seaTunnelConfig) {
return new ClientJobExecutionEnvironment(
- jobConfig, filePath, hazelcastClient, seaTunnelConfig);
+ jobConfig, filePath, variables, hazelcastClient,
seaTunnelConfig);
+ }
+
+ @Override
+ public ClientJobExecutionEnvironment restoreExecutionContext(
+ @NonNull String filePath,
+ @NonNull JobConfig jobConfig,
+ @NonNull SeaTunnelConfig seaTunnelConfig,
+ @NonNull Long jobId) {
+ return restoreExecutionContext(filePath, null, jobConfig,
seaTunnelConfig, jobId);
}
@Override
public ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
+ List<String> variables,
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig,
@NonNull Long jobId) {
return new ClientJobExecutionEnvironment(
- jobConfig, filePath, hazelcastClient, seaTunnelConfig, true,
jobId);
+ jobConfig, filePath, variables, hazelcastClient,
seaTunnelConfig, true, jobId);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 1f525ff73b..36a3f2e36e 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import lombok.NonNull;
+import java.util.List;
+
public interface SeaTunnelClientInstance {
ClientJobExecutionEnvironment createExecutionContext(
@@ -31,8 +33,21 @@ public interface SeaTunnelClientInstance {
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig);
+ ClientJobExecutionEnvironment createExecutionContext(
+ @NonNull String filePath,
+ List<String> variables,
+ @NonNull JobConfig config,
+ @NonNull SeaTunnelConfig seaTunnelConfig);
+
+ ClientJobExecutionEnvironment restoreExecutionContext(
+ @NonNull String filePath,
+ @NonNull JobConfig config,
+ @NonNull SeaTunnelConfig seaTunnelConfig,
+ @NonNull Long jobId);
+
ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
+ List<String> variables,
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig,
@NonNull Long jobId);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index d92f9722dc..18f1a7376f 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -42,6 +42,8 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
private final String jobFilePath;
+ private final List<String> variables;
+
private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private final JobClient jobClient;
@@ -54,12 +56,14 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
+ List<String> variables,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
SeaTunnelConfig seaTunnelConfig,
boolean isStartWithSavePoint,
Long jobId) {
super(jobConfig, isStartWithSavePoint);
this.jobFilePath = jobFilePath;
+ this.variables = variables;
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.seaTunnelConfig = seaTunnelConfig;
@@ -71,16 +75,29 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
+ List<String> variables,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
SeaTunnelConfig seaTunnelConfig) {
- this(jobConfig, jobFilePath, seaTunnelHazelcastClient,
seaTunnelConfig, false, null);
+ this(
+ jobConfig,
+ jobFilePath,
+ variables,
+ seaTunnelHazelcastClient,
+ seaTunnelConfig,
+ false,
+ null);
}
/** Search all jars in SEATUNNEL_HOME/plugins */
@Override
protected MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
- jobFilePath, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
+ jobFilePath,
+ variables,
+ idGenerator,
+ jobConfig,
+ commonPluginJars,
+ isStartWithSavePoint);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 883c7c59fa..6e5fa3decd 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -123,11 +123,27 @@ public class MultipleTableJobConfigParser {
JobConfig jobConfig,
List<URL> commonPluginJars,
boolean isStartWithSavePoint) {
+ this(
+ jobDefineFilePath,
+ null,
+ idGenerator,
+ jobConfig,
+ commonPluginJars,
+ isStartWithSavePoint);
+ }
+
+ public MultipleTableJobConfigParser(
+ String jobDefineFilePath,
+ List<String> variables,
+ IdGenerator idGenerator,
+ JobConfig jobConfig,
+ List<URL> commonPluginJars,
+ boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.commonPluginJars = commonPluginJars;
this.isStartWithSavePoint = isStartWithSavePoint;
- this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
+ this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.fallbackParser =
new JobConfigParser(idGenerator, commonPluginJars,
isStartWithSavePoint);