This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e2a4b64148 [Feature][starter] support user define parameter on 
spark/flink engine (#6387)
e2a4b64148 is described below

commit e2a4b6414887684c2ad35b9ed6bbf353d25a34ec
Author: Jarvis <[email protected]>
AuthorDate: Tue Apr 30 09:12:14 2024 +0800

    [Feature][starter] support user define parameter on spark/flink engine 
(#6387)
---
 docs/en/concept/config.md                          | 110 +++++++++++++++++++++
 .../core/starter/command/ConfEncryptCommand.java   |  17 +++-
 .../core/starter/utils/ConfigBuilder.java          |  53 +++++++---
 .../core/starter/utils/ConfigShadeTest.java        |  50 ++++++++++
 .../src/test/resources/config.variables.conf       |  68 +++++++++++++
 .../seatunnel/core/starter/flink/FlinkStarter.java |   2 +-
 .../seatunnel/core/starter/flink/FlinkStarter.java |   2 +-
 .../flink/command/FlinkTaskExecuteCommand.java     |   2 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |  42 ++------
 .../core/starter/spark/SparkStarterTest.java       |   2 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |  42 ++------
 .../spark/command/SparkTaskExecuteCommand.java     |   2 +-
 .../starter/seatunnel/args/ClientCommandArgs.java  |  12 ---
 .../seatunnel/command/ClientExecuteCommand.java    |   6 +-
 .../seatunnel/args/ClientCommandArgsTest.java      | 107 --------------------
 .../common/container/AbstractTestContainer.java    |  16 ++-
 .../e2e/common/container/TestContainer.java        |   4 +
 .../flink/AbstractTestFlinkContainer.java          |   8 +-
 .../ConnectorPackageServiceContainer.java          |   8 +-
 .../container/seatunnel/SeaTunnelContainer.java    |   8 +-
 .../spark/AbstractTestSparkContainer.java          |   9 +-
 .../seatunnel/engine/e2e/UserVariableIT.java       |  47 +++++++++
 .../test/resources/fake_to_console.variables.conf  |  62 ++++++++++++
 .../seatunnel/engine/client/SeaTunnelClient.java   |  24 ++++-
 .../engine/client/SeaTunnelClientInstance.java     |  15 +++
 .../client/job/ClientJobExecutionEnvironment.java  |  21 +++-
 .../core/parse/MultipleTableJobConfigParser.java   |  18 +++-
 27 files changed, 539 insertions(+), 218 deletions(-)

diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md
index c5f549ef98..aea170f79f 100644
--- a/docs/en/concept/config.md
+++ b/docs/en/concept/config.md
@@ -206,6 +206,116 @@ configured with these two parameters, because in 
SeaTunnel, there is a default c
 parameters are not configured, then the generated data from the last module of 
the previous node will be used.
 This is much more convenient when there is only one source.
 
+## Config variable substitution
+
+In config file we can define some variables and replace it in run time. **This 
is only support `hocon` format file**.
+
+```hocon
+env {
+  job.mode = "BATCH"
+  job.name = ${jobName}
+  parallelism = 2
+}
+
+source {
+  FakeSource {
+    result_table_name = ${resName}
+    row.num = ${rowNum}
+    string.template = ${strTemplate}
+    int.template = [20, 21]
+    schema = {
+      fields {
+        name = ${nameType}
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+    sql {
+      source_table_name = "fake"
+      result_table_name = "sql"
+      query = "select * from "${resName}" where name = '"${nameVal}"' "
+    }
+
+}
+
+sink {
+  Console {
+     source_table_name = "sql"
+     username = ${username}
+     password = ${password}
+     blankSpace = ${blankSpace}
+  }
+}
+
+```
+
+In the above config, we define some variables, like `${rowNum}`, `${resName}`.
+We can replace those parameters with this shell command:
+
+```shell
+./bin/seatunnel.sh -c <this_config_file> 
+-i jobName='st var job' 
+-i resName=fake 
+-i rowNum=10 
+-i strTemplate=['abc','d~f','h i'] 
+-i nameType=string 
+-i nameVal=abc 
+-i username=seatunnel=2.3.1 
+-i password='$a^b%c.d~e0*9(' 
+-i blankSpace='2023-12-26 11:30:00' 
+-e local
+```
+
+Then the final submitted config is:
+
+```hocon
+env {
+  job.mode = "BATCH"
+  job.name = "st var job"
+  parallelism = 2
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 10
+    string.template = ["abc","d~f","h i"]
+    int.template = [20, 21]
+    schema = {
+      fields {
+        name = string
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+    sql {
+      source_table_name = "fake"
+      result_table_name = "sql"
+      query = "select * from fake where name = 'abc' "
+    }
+
+}
+
+sink {
+  Console {
+     source_table_name = "sql"
+     username = "seatunnel=2.3.1"
+     password = "$a^b%c.d~e0*9("
+     blankSpace = "2023-12-26 11:30:00"
+  }
+}
+```
+
+Some Notes:
+- quota with `'` if the value has space ` ` or special character (like `(`)
+- if the replacement variables is in `"` or `'`, like `resName` and `nameVal`, 
you need add `"`
+
 ## What's More
 
 If you want to know the details of this format configuration, Please
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
index 3fef617568..f94b235599 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
@@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Objects;
 
 import static 
org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
 
@@ -53,10 +54,18 @@ public class ConfEncryptCommand implements 
Command<AbstractCommandArgs> {
         checkConfigExist(configPath);
         Config config =
                 ConfigFactory.parseFile(configPath.toFile())
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        if (abstractCommandArgs.getVariables() != null) {
+            abstractCommandArgs.getVariables().stream()
+                    .filter(Objects::nonNull)
+                    .map(variable -> variable.split("=", 2))
+                    .filter(pair -> pair.length == 2)
+                    .forEach(pair -> System.setProperty(pair[0], pair[1]));
+            config =
+                    config.resolveWith(
+                            ConfigFactory.systemProperties(),
+                            
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        }
         Config encryptConfig = ConfigShadeUtils.encryptConfig(config);
         log.info(
                 "Encrypt config: \n{}",
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index 0859690feb..c667a0431e 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -19,8 +19,10 @@ package org.apache.seatunnel.core.starter.utils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.impl.Parseable;
 
 import org.apache.seatunnel.api.configuration.ConfigAdapter;
 
@@ -29,7 +31,9 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 /** Used to build the {@link Config} from config file. */
@@ -43,14 +47,11 @@ public class ConfigBuilder {
         // utility class and cannot be instantiated
     }
 
-    private static Config ofInner(@NonNull Path filePath) {
+    private static Config ofInner(@NonNull Path filePath, List<String> 
variables) {
         Config config =
                 ConfigFactory.parseFile(filePath.toFile())
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-        return ConfigShadeUtils.decryptConfig(config);
+                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        return ConfigShadeUtils.decryptConfig(backfillUserVariables(config, 
variables));
     }
 
     public static Config of(@NonNull String filePath) {
@@ -58,13 +59,22 @@ public class ConfigBuilder {
         return of(path);
     }
 
+    public static Config of(@NonNull String filePath, List<String> variables) {
+        Path path = Paths.get(filePath);
+        return of(path, variables);
+    }
+
     public static Config of(@NonNull Path filePath) {
+        return of(filePath, null);
+    }
+
+    public static Config of(@NonNull Path filePath, List<String> variables) {
         log.info("Loading config file from path: {}", filePath);
         Optional<ConfigAdapter> adapterSupplier = 
ConfigAdapterUtils.selectAdapter(filePath);
         Config config =
                 adapterSupplier
-                        .map(adapter -> of(adapter, filePath))
-                        .orElseGet(() -> ofInner(filePath));
+                        .map(adapter -> of(adapter, filePath, variables))
+                        .orElseGet(() -> ofInner(filePath, variables));
         log.info("Parsed config file: \n{}", 
config.root().render(CONFIG_RENDER_OPTIONS));
         return config;
     }
@@ -88,17 +98,38 @@ public class ConfigBuilder {
         return config;
     }
 
-    public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull 
Path filePath) {
+    public static Config of(
+            @NonNull ConfigAdapter configAdapter, @NonNull Path filePath, 
List<String> variables) {
         log.info("With config adapter spi {}", 
configAdapter.getClass().getName());
         try {
             Map<String, Object> flattenedMap = 
configAdapter.loadConfig(filePath);
             Config config = ConfigFactory.parseMap(flattenedMap);
-            return ConfigShadeUtils.decryptConfig(config);
+            return 
ConfigShadeUtils.decryptConfig(backfillUserVariables(config, variables));
         } catch (Exception warn) {
             log.warn(
                     "Loading config failed with spi {}, fallback to HOCON 
loader.",
                     configAdapter.getClass().getName());
-            return ofInner(filePath);
+            return ofInner(filePath, variables);
+        }
+    }
+
+    private static Config backfillUserVariables(Config config, List<String> 
variables) {
+        if (variables != null) {
+            variables.stream()
+                    .filter(Objects::nonNull)
+                    .map(variable -> variable.split("=", 2))
+                    .filter(pair -> pair.length == 2)
+                    .forEach(pair -> System.setProperty(pair[0], pair[1]));
+            Config systemConfig =
+                    Parseable.newProperties(
+                                    System.getProperties(),
+                                    ConfigParseOptions.defaults()
+                                            .setOriginDescription("system 
properties"))
+                            .parse()
+                            .toConfig();
+            return config.resolveWith(
+                    systemConfig, 
ConfigResolveOptions.defaults().setAllowUnresolved(true));
         }
+        return config;
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
 
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
index e25781b910..9382c68663 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.utils;
 
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
 
 import org.apache.seatunnel.api.configuration.ConfigShade;
 import org.apache.seatunnel.common.utils.JsonUtils;
@@ -67,6 +68,55 @@ public class ConfigShadeTest {
                 config.getConfigList("source").get(0).getString("password"), 
PASSWORD);
     }
 
+    @Test
+    public void testVariableReplacement() throws URISyntaxException {
+        String jobName = "seatunnel variable test job";
+        String resName = "fake";
+        int rowNum = 10;
+        String nameType = "string";
+        String username = "seatunnel=2.3.1";
+        String password = "$a^b%c.d~e0*9(";
+        String blankSpace = "2023-12-26 11:30:00";
+        List<String> variables = new ArrayList<>();
+        variables.add("jobName=" + jobName);
+        variables.add("resName=" + resName);
+        variables.add("rowNum=" + rowNum);
+        variables.add("strTemplate=[abc,de~,f h]");
+        variables.add("nameType=" + nameType);
+        variables.add("nameVal=abc");
+        variables.add("username=" + username);
+        variables.add("password=" + password);
+        variables.add("blankSpace=" + blankSpace);
+        URL resource = 
ConfigShadeTest.class.getResource("/config.variables.conf");
+        Assertions.assertNotNull(resource);
+        Config config = ConfigBuilder.of(Paths.get(resource.toURI()), 
variables);
+        Config envConfig = config.getConfig("env");
+        Assertions.assertEquals(envConfig.getString("job.name"), jobName);
+        List<? extends ConfigObject> sourceConfigs = 
config.getObjectList("source");
+        for (ConfigObject configObject : sourceConfigs) {
+            Config sourceConfig = configObject.toConfig();
+            List<String> list1 = sourceConfig.getStringList("string.template");
+            Assertions.assertEquals(list1.get(0), "abc");
+            Assertions.assertEquals(list1.get(1), "de~");
+            Assertions.assertEquals(list1.get(2), "f h");
+            Assertions.assertEquals(sourceConfig.getInt("row.num"), rowNum);
+            
Assertions.assertEquals(sourceConfig.getString("result_table_name"), resName);
+        }
+        List<? extends ConfigObject> transformConfigs = 
config.getObjectList("transform");
+        for (ConfigObject configObject : transformConfigs) {
+            Config transformConfig = configObject.toConfig();
+            Assertions.assertEquals(
+                    transformConfig.getString("query"), "select * from fake 
where name = 'abc' ");
+        }
+        List<? extends ConfigObject> sinkConfigs = 
config.getObjectList("sink");
+        for (ConfigObject sinkObject : sinkConfigs) {
+            Config sinkConfig = sinkObject.toConfig();
+            Assertions.assertEquals(sinkConfig.getString("username"), 
username);
+            Assertions.assertEquals(sinkConfig.getString("password"), 
password);
+            Assertions.assertEquals(sinkConfig.getString("blankSpace"), 
blankSpace);
+        }
+    }
+
     @Test
     public void testDecryptAndEncrypt() {
         String encryptUsername = ConfigShadeUtils.encryptOption("base64", 
USERNAME);
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/test/resources/config.variables.conf
 
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config.variables.conf
new file mode 100644
index 0000000000..7355dcc29a
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config.variables.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+  job.name = ${jobName}
+  parallelism = 2
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = ${resName}
+    row.num = ${rowNum}
+    string.template = ${strTemplate}
+    int.template = [20, 21]
+    schema = {
+      fields {
+        name = ${nameType}
+        age = "int"
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform-v2
+    sql {
+      source_table_name = "fake"
+      result_table_name = "sql"
+      query = "select * from "${resName}" where name = '"${nameVal}"' "
+    }
+
+}
+
+sink {
+  Console {
+     source_table_name = "sql"
+     username = ${username}
+     password = ${password}
+     blankSpace = ${blankSpace}
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 5dc1d32cef..c244f2ff33 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -90,7 +90,7 @@ public class FlinkStarter implements Starter {
         flinkCommandArgs.getVariables().stream()
                 .filter(Objects::nonNull)
                 .map(String::trim)
-                .forEach(variable -> command.add("-D" + variable));
+                .forEach(variable -> command.add("-i " + variable));
         return command;
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 7373cb58ed..e74bbd402f 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -90,7 +90,7 @@ public class FlinkStarter implements Starter {
         flinkCommandArgs.getVariables().stream()
                 .filter(Objects::nonNull)
                 .map(String::trim)
-                .forEach(variable -> command.add("-D" + variable));
+                .forEach(variable -> command.add("-i " + variable));
         return command;
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
index f8539af752..e831fb081b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
@@ -48,7 +48,7 @@ public class FlinkTaskExecuteCommand implements 
Command<FlinkCommandArgs> {
     public void execute() throws CommandExecuteException {
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
         checkConfigExist(configFile);
-        Config config = ConfigBuilder.of(configFile);
+        Config config = ConfigBuilder.of(configFile, 
flinkCommandArgs.getVariables());
         // if user specified job name using command line arguments, override 
config option
         if (!flinkCommandArgs.getJobName().equals(Constants.LOGO)) {
             config =
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 1b8918976b..f2c20e8408 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -18,8 +18,6 @@
 package org.apache.seatunnel.core.starter.spark;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
@@ -129,40 +127,12 @@ public class SparkStarter implements Starter {
 
     /** parse spark configurations from SeaTunnel config file */
     private void setSparkConf() throws FileNotFoundException {
-        commandArgs.getVariables().stream()
-                .filter(Objects::nonNull)
-                .map(variable -> variable.split("=", 2))
-                .filter(pair -> pair.length == 2)
-                .forEach(pair -> System.setProperty(pair[0], pair[1]));
-        this.sparkConf = getSparkConf(commandArgs.getConfigFile());
-        String driverJavaOpts = 
this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
-        String executorJavaOpts =
-                this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", 
"");
-        if (!commandArgs.getVariables().isEmpty()) {
-            String properties =
-                    commandArgs.getVariables().stream()
-                            .map(v -> "-D" + v)
-                            .collect(Collectors.joining(" "));
-            driverJavaOpts += " " + properties;
-            executorJavaOpts += " " + properties;
-            this.sparkConf.put("spark.driver.extraJavaOptions", 
driverJavaOpts.trim());
-            this.sparkConf.put("spark.executor.extraJavaOptions", 
executorJavaOpts.trim());
-        }
+        this.sparkConf = getSparkConf(commandArgs.getConfigFile(), 
commandArgs.getVariables());
     }
 
     /** Get spark configurations from SeaTunnel job config file. */
-    static Map<String, String> getSparkConf(String configFile) throws 
FileNotFoundException {
-        File file = new File(configFile);
-        if (!file.exists()) {
-            throw new FileNotFoundException("config file '" + file + "' does 
not exists!");
-        }
-        Config appConfig =
-                ConfigFactory.parseFile(file)
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-
+    static Map<String, String> getSparkConf(String configFile, List<String> 
variables) {
+        Config appConfig = ConfigBuilder.of(configFile, variables);
         return appConfig.getConfig("env").entrySet().stream()
                 .collect(
                         Collectors.toMap(
@@ -175,7 +145,7 @@ public class SparkStarter implements Starter {
         if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) 
{
             return Collections.emptyList();
         }
-        Config config = ConfigBuilder.of(commandArgs.getConfigFile());
+        Config config = ConfigBuilder.of(commandArgs.getConfigFile(), 
commandArgs.getVariables());
         Set<URL> pluginJars = new HashSet<>();
         SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
                 new SeaTunnelSourcePluginDiscovery();
@@ -217,6 +187,10 @@ public class SparkStarter implements Starter {
         if (this.commandArgs.isCheckConfig()) {
             commands.add("--check");
         }
+        this.commandArgs.getVariables().stream()
+                .filter(Objects::nonNull)
+                .map(String::trim)
+                .forEach(variable -> commands.add("-i " + variable));
         return commands;
     }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
index 41bff86d30..7189fb8a60 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
@@ -33,7 +33,7 @@ public class SparkStarterTest {
     public void testGetSparkConf() throws URISyntaxException, 
FileNotFoundException {
         URI uri = 
ClassLoader.getSystemResource("spark_application.conf").toURI();
         String file = new File(uri).toString();
-        Map<String, String> sparkConf = SparkStarter.getSparkConf(file);
+        Map<String, String> sparkConf = SparkStarter.getSparkConf(file, null);
         assertEquals("SeaTunnel", sparkConf.get("job.name"));
         assertEquals("1", sparkConf.get("spark.executor.cores"));
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index c33544873a..790a20191d 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -18,8 +18,6 @@
 package org.apache.seatunnel.core.starter.spark;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
@@ -129,40 +127,12 @@ public class SparkStarter implements Starter {
 
     /** parse spark configurations from SeaTunnel config file */
     private void setSparkConf() throws FileNotFoundException {
-        commandArgs.getVariables().stream()
-                .filter(Objects::nonNull)
-                .map(variable -> variable.split("=", 2))
-                .filter(pair -> pair.length == 2)
-                .forEach(pair -> System.setProperty(pair[0], pair[1]));
-        this.sparkConf = getSparkConf(commandArgs.getConfigFile());
-        String driverJavaOpts = 
this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
-        String executorJavaOpts =
-                this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", 
"");
-        if (!commandArgs.getVariables().isEmpty()) {
-            String properties =
-                    commandArgs.getVariables().stream()
-                            .map(v -> "-D" + v)
-                            .collect(Collectors.joining(" "));
-            driverJavaOpts += " " + properties;
-            executorJavaOpts += " " + properties;
-            this.sparkConf.put("spark.driver.extraJavaOptions", 
driverJavaOpts.trim());
-            this.sparkConf.put("spark.executor.extraJavaOptions", 
executorJavaOpts.trim());
-        }
+        this.sparkConf = getSparkConf(commandArgs.getConfigFile(), 
commandArgs.getVariables());
     }
 
     /** Get spark configurations from SeaTunnel job config file. */
-    static Map<String, String> getSparkConf(String configFile) throws 
FileNotFoundException {
-        File file = new File(configFile);
-        if (!file.exists()) {
-            throw new FileNotFoundException("config file '" + file + "' does 
not exists!");
-        }
-        Config appConfig =
-                ConfigFactory.parseFile(file)
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-
+    static Map<String, String> getSparkConf(String configFile, List<String> 
variables) {
+        Config appConfig = ConfigBuilder.of(configFile, variables);
         return appConfig.getConfig("env").entrySet().stream()
                 .collect(
                         Collectors.toMap(
@@ -175,7 +145,7 @@ public class SparkStarter implements Starter {
         if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) 
{
             return Collections.emptyList();
         }
-        Config config = ConfigBuilder.of(commandArgs.getConfigFile());
+        Config config = ConfigBuilder.of(commandArgs.getConfigFile(), 
commandArgs.getVariables());
         Set<URL> pluginJars = new HashSet<>();
         SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
                 new SeaTunnelSourcePluginDiscovery();
@@ -217,6 +187,10 @@ public class SparkStarter implements Starter {
         if (this.commandArgs.isCheckConfig()) {
             commands.add("--check");
         }
+        this.commandArgs.getVariables().stream()
+                .filter(Objects::nonNull)
+                .map(String::trim)
+                .forEach(variable -> commands.add("-i " + variable));
         return commands;
     }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
index 1f5f4242c5..ea36bc0777 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
@@ -48,7 +48,7 @@ public class SparkTaskExecuteCommand implements 
Command<SparkCommandArgs> {
     public void execute() throws CommandExecuteException {
         Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
         checkConfigExist(configFile);
-        Config config = ConfigBuilder.of(configFile);
+        Config config = ConfigBuilder.of(configFile, 
sparkCommandArgs.getVariables());
         if (!sparkCommandArgs.getJobName().equals(Constants.LOGO)) {
             config =
                     config.withValue(
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index eefcc1a0a6..9504c9cb2c 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -37,7 +37,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 @EqualsAndHashCode(callSuper = true)
 @Data
@@ -103,7 +102,6 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     @Override
     public Command<?> buildCommand() {
         Common.setDeployMode(getDeployMode());
-        userParamsToSysEnv();
         if (checkConfig) {
             return new SeaTunnelConfValidateCommand(this);
         }
@@ -116,16 +114,6 @@ public class ClientCommandArgs extends AbstractCommandArgs 
{
         return new ClientExecuteCommand(this);
     }
 
-    private void userParamsToSysEnv() {
-        if (!this.variables.isEmpty()) {
-            variables.stream()
-                    .filter(Objects::nonNull)
-                    .map(variable -> variable.split("=", 2))
-                    .filter(pair -> pair.length == 2)
-                    .forEach(pair -> System.setProperty(pair[0], pair[1]));
-        }
-    }
-
     public DeployMode getDeployMode() {
         return DeployMode.CLIENT;
     }
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 980832f4de..d1e8b78009 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -134,13 +134,17 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
                     jobExecutionEnv =
                             engineClient.restoreExecutionContext(
                                     configFile.toString(),
+                                    clientCommandArgs.getVariables(),
                                     jobConfig,
                                     seaTunnelConfig,
                                     
Long.parseLong(clientCommandArgs.getRestoreJobId()));
                 } else {
                     jobExecutionEnv =
                             engineClient.createExecutionContext(
-                                    configFile.toString(), jobConfig, 
seaTunnelConfig);
+                                    configFile.toString(),
+                                    clientCommandArgs.getVariables(),
+                                    jobConfig,
+                                    seaTunnelConfig);
                 }
 
                 // get job start time
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
deleted file mode 100644
index 19fc639700..0000000000
--- 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.seatunnel.args;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-
-import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
-
-public class ClientCommandArgsTest {
-    @Test
-    public void testUserDefinedParamsCommand() throws URISyntaxException {
-        int fakeParallelism = 16;
-        String username = "seatunnel=2.3.1";
-        String password = "dsjr42=4wfskahdsd=w1chh";
-        String fakeSourceTable = "fake";
-        String fakeSinkTable = "sink";
-        String list = "[par1=20230829,par2=20230829]";
-        String blankSpace = "2023-12-26 11:30:00";
-        String[] args = {
-            "-c",
-            "/args/user_defined_params.conf",
-            "-e",
-            "local",
-            "-i",
-            "fake_source_table=" + fakeSourceTable,
-            "-i",
-            "fake_parallelism=" + fakeParallelism,
-            "-i",
-            "fake_sink_table=" + fakeSinkTable,
-            "-i",
-            "password=" + password,
-            "-i",
-            "username=" + username,
-            "-i",
-            "blankSpace=" + blankSpace,
-            "-i",
-            "list=" + list,
-            "-i",
-            "sql=" + "\"select a , b from fake_source_table\""
-        };
-        ClientCommandArgs clientCommandArgs =
-                CommandLineUtils.parse(args, new ClientCommandArgs(), 
"seatunnel-zeta", true);
-        clientCommandArgs.buildCommand();
-        URL resource = 
ClientCommandArgsTest.class.getResource("/args/user_defined_params.conf");
-
-        Config config =
-                ConfigFactory.parseFile(Paths.get(resource.toURI()).toFile())
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-        List<? extends ConfigObject> sourceConfigs = 
config.getObjectList("source");
-        for (ConfigObject configObject : sourceConfigs) {
-            Config sourceConfig = configObject.toConfig();
-
-            String tableName = sourceConfig.getString("result_table_name");
-            Assertions.assertEquals(tableName, fakeSourceTable);
-
-            int parallelism = 
Integer.parseInt(sourceConfig.getString("parallelism"));
-            Assertions.assertEquals(fakeParallelism, parallelism);
-
-            Assertions.assertEquals(sourceConfig.getString("username"), 
username);
-            Assertions.assertEquals(sourceConfig.getString("password"), 
password);
-        }
-        List<? extends ConfigObject> sinkConfigs = 
config.getObjectList("sink");
-        for (ConfigObject sinkObject : sinkConfigs) {
-            Config sinkConfig = sinkObject.toConfig();
-            String tableName = sinkConfig.getString("result_table_name");
-            Assertions.assertEquals(tableName, fakeSinkTable);
-
-            Assertions.assertEquals(sinkConfig.getString("username"), 
username);
-            Assertions.assertEquals(sinkConfig.getString("password"), 
password);
-            List<String> list1 = sinkConfig.getStringList("list");
-            Assertions.assertEquals(list1.get(0), "par1=20230829");
-            Assertions.assertEquals(list1.get(1), "par2=20230829");
-            String sql = sinkConfig.getString("sql");
-            Assertions.assertEquals(sql, "\"select a , b from 
fake_source_table\"");
-            Assertions.assertEquals(sinkConfig.getString("blankSpace"), 
blankSpace);
-        }
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index 47e898f5ca..b033144d58 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -95,6 +95,12 @@ public abstract class AbstractTestContainer implements 
TestContainer {
 
     protected Container.ExecResult executeJob(GenericContainer<?> container, 
String confFile)
             throws IOException, InterruptedException {
+        return executeJob(container, confFile, null);
+    }
+
+    protected Container.ExecResult executeJob(
+            GenericContainer<?> container, String confFile, List<String> 
variables)
+            throws IOException, InterruptedException {
         final String confInContainerPath = 
copyConfigFileToContainer(container, confFile);
         // copy connectors
         copyConnectorJarToContainer(
@@ -110,7 +116,15 @@ public abstract class AbstractTestContainer implements 
TestContainer {
         command.add(adaptPathForWin(binPath));
         command.add("--config");
         command.add(adaptPathForWin(confInContainerPath));
-        command.addAll(getExtraStartShellCommands());
+        List<String> extraStartShellCommands = new 
ArrayList<>(getExtraStartShellCommands());
+        if (variables != null && !variables.isEmpty()) {
+            variables.forEach(
+                    v -> {
+                        extraStartShellCommands.add("-i");
+                        extraStartShellCommands.add(v);
+                    });
+        }
+        command.addAll(extraStartShellCommands);
         return executeCommand(container, command);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index e277a7a822..33b196eeba 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -23,6 +23,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.Network;
 
 import java.io.IOException;
+import java.util.List;
 
 public interface TestContainer extends TestResource {
 
@@ -35,6 +36,9 @@ public interface TestContainer extends TestResource {
 
     Container.ExecResult executeJob(String confFile) throws IOException, 
InterruptedException;
 
+    Container.ExecResult executeJob(String confFile, List<String> variables)
+            throws IOException, InterruptedException;
+
     default Container.ExecResult executeConnectorCheck(String[] args)
             throws IOException, InterruptedException {
         throw new UnsupportedOperationException("Not implemented");
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 91320b3d9a..7145da6242 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -149,8 +149,14 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
+        return executeJob(confFile, null);
+    }
+
+    @Override
+    public Container.ExecResult executeJob(String confFile, List<String> 
variables)
+            throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(jobManager, confFile);
+        return executeJob(jobManager, confFile, variables);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index 4e30d2c92b..4f5ea99029 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -220,8 +220,14 @@ public class ConnectorPackageServiceContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
+        return executeJob(confFile, null);
+    }
+
+    @Override
+    public Container.ExecResult executeJob(String confFile, List<String> 
variables)
+            throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(server1, confFile);
+        return executeJob(server1, confFile, variables);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 37af5011ef..f9a139b040 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -267,10 +267,16 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
+        return executeJob(confFile, null);
+    }
+
+    @Override
+    public Container.ExecResult executeJob(String confFile, List<String> 
variables)
+            throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
         List<String> beforeThreads = ContainerUtil.getJVMThreadNames(server);
         runningCount.incrementAndGet();
-        Container.ExecResult result = executeJob(server, confFile);
+        Container.ExecResult result = executeJob(server, confFile, variables);
         if (runningCount.decrementAndGet() > 0) {
             // only check thread when job all finished.
             return result;
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index bd3c1aeebc..fe07d082af 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -101,10 +101,17 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
         extendedFactory.extend(master);
     }
 
+    @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
+        return executeJob(confFile, null);
+    }
+
+    @Override
+    public Container.ExecResult executeJob(String confFile, List<String> 
variables)
+            throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(master, confFile);
+        return executeJob(master, confFile, variables);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
new file mode 100644
index 0000000000..03455af2b0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UserVariableIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void userVariableTest(TestContainer container) throws IOException, 
InterruptedException {
+        List<String> variables = new ArrayList<>();
+        String list = "[abc,def]";
+        variables.add("resName=fake");
+        variables.add("rowNum=10");
+        variables.add("strTemplate=" + list);
+        variables.add("nameType=string");
+        variables.add("nameVal=abc");
+        variables.add("sourceTableName=sql");
+        Container.ExecResult execResult =
+                container.executeJob("/fake_to_console.variables.conf", 
variables);
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf
new file mode 100644
index 0000000000..48f7ec548b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+  parallelism = 2
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = ${resName}
+    row.num = ${rowNum}
+    string.template = ${strTemplate}
+    schema = {
+      fields {
+        name = ${nameType}
+        age = "int"
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform-v2
+    sql {
+      source_table_name = "fake"
+      query = "select * from "${resName}" where name = '"${nameVal}"' "
+      result_table_name = "sql"
+    }
+}
+
+sink {
+  Console {
+     source_table_name = ${sourceTableName}
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 22aa0ffd13..2d7508ee2d 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -36,6 +36,7 @@ import lombok.NonNull;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -53,18 +54,37 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance, AutoCloseable {
             @NonNull String filePath,
             @NonNull JobConfig jobConfig,
             @NonNull SeaTunnelConfig seaTunnelConfig) {
+        return createExecutionContext(filePath, null, jobConfig, 
seaTunnelConfig);
+    }
+
+    @Override
+    public ClientJobExecutionEnvironment createExecutionContext(
+            @NonNull String filePath,
+            List<String> variables,
+            @NonNull JobConfig jobConfig,
+            @NonNull SeaTunnelConfig seaTunnelConfig) {
         return new ClientJobExecutionEnvironment(
-                jobConfig, filePath, hazelcastClient, seaTunnelConfig);
+                jobConfig, filePath, variables, hazelcastClient, 
seaTunnelConfig);
+    }
+
+    @Override
+    public ClientJobExecutionEnvironment restoreExecutionContext(
+            @NonNull String filePath,
+            @NonNull JobConfig jobConfig,
+            @NonNull SeaTunnelConfig seaTunnelConfig,
+            @NonNull Long jobId) {
+        return restoreExecutionContext(filePath, null, jobConfig, 
seaTunnelConfig, jobId);
     }
 
     @Override
     public ClientJobExecutionEnvironment restoreExecutionContext(
             @NonNull String filePath,
+            List<String> variables,
             @NonNull JobConfig jobConfig,
             @NonNull SeaTunnelConfig seaTunnelConfig,
             @NonNull Long jobId) {
         return new ClientJobExecutionEnvironment(
-                jobConfig, filePath, hazelcastClient, seaTunnelConfig, true, 
jobId);
+                jobConfig, filePath, variables, hazelcastClient, 
seaTunnelConfig, true, jobId);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 1f525ff73b..36a3f2e36e 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 
 import lombok.NonNull;
 
+import java.util.List;
+
 public interface SeaTunnelClientInstance {
 
     ClientJobExecutionEnvironment createExecutionContext(
@@ -31,8 +33,21 @@ public interface SeaTunnelClientInstance {
             @NonNull JobConfig config,
             @NonNull SeaTunnelConfig seaTunnelConfig);
 
+    ClientJobExecutionEnvironment createExecutionContext(
+            @NonNull String filePath,
+            List<String> variables,
+            @NonNull JobConfig config,
+            @NonNull SeaTunnelConfig seaTunnelConfig);
+
+    ClientJobExecutionEnvironment restoreExecutionContext(
+            @NonNull String filePath,
+            @NonNull JobConfig config,
+            @NonNull SeaTunnelConfig seaTunnelConfig,
+            @NonNull Long jobId);
+
     ClientJobExecutionEnvironment restoreExecutionContext(
             @NonNull String filePath,
+            List<String> variables,
             @NonNull JobConfig config,
             @NonNull SeaTunnelConfig seaTunnelConfig,
             @NonNull Long jobId);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index d92f9722dc..18f1a7376f 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -42,6 +42,8 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
 
     private final String jobFilePath;
 
+    private final List<String> variables;
+
     private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
 
     private final JobClient jobClient;
@@ -54,12 +56,14 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
     public ClientJobExecutionEnvironment(
             JobConfig jobConfig,
             String jobFilePath,
+            List<String> variables,
             SeaTunnelHazelcastClient seaTunnelHazelcastClient,
             SeaTunnelConfig seaTunnelConfig,
             boolean isStartWithSavePoint,
             Long jobId) {
         super(jobConfig, isStartWithSavePoint);
         this.jobFilePath = jobFilePath;
+        this.variables = variables;
         this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
         this.jobClient = new JobClient(seaTunnelHazelcastClient);
         this.seaTunnelConfig = seaTunnelConfig;
@@ -71,16 +75,29 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
     public ClientJobExecutionEnvironment(
             JobConfig jobConfig,
             String jobFilePath,
+            List<String> variables,
             SeaTunnelHazelcastClient seaTunnelHazelcastClient,
             SeaTunnelConfig seaTunnelConfig) {
-        this(jobConfig, jobFilePath, seaTunnelHazelcastClient, 
seaTunnelConfig, false, null);
+        this(
+                jobConfig,
+                jobFilePath,
+                variables,
+                seaTunnelHazelcastClient,
+                seaTunnelConfig,
+                false,
+                null);
     }
 
     /** Search all jars in SEATUNNEL_HOME/plugins */
     @Override
     protected MultipleTableJobConfigParser getJobConfigParser() {
         return new MultipleTableJobConfigParser(
-                jobFilePath, idGenerator, jobConfig, commonPluginJars, 
isStartWithSavePoint);
+                jobFilePath,
+                variables,
+                idGenerator,
+                jobConfig,
+                commonPluginJars,
+                isStartWithSavePoint);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 883c7c59fa..6e5fa3decd 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -123,11 +123,27 @@ public class MultipleTableJobConfigParser {
             JobConfig jobConfig,
             List<URL> commonPluginJars,
             boolean isStartWithSavePoint) {
+        this(
+                jobDefineFilePath,
+                null,
+                idGenerator,
+                jobConfig,
+                commonPluginJars,
+                isStartWithSavePoint);
+    }
+
+    public MultipleTableJobConfigParser(
+            String jobDefineFilePath,
+            List<String> variables,
+            IdGenerator idGenerator,
+            JobConfig jobConfig,
+            List<URL> commonPluginJars,
+            boolean isStartWithSavePoint) {
         this.idGenerator = idGenerator;
         this.jobConfig = jobConfig;
         this.commonPluginJars = commonPluginJars;
         this.isStartWithSavePoint = isStartWithSavePoint;
-        this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
+        this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.fallbackParser =
                 new JobConfigParser(idGenerator, commonPluginJars, 
isStartWithSavePoint);

Reply via email to