This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c226acca57 [Feature][Zeta] Configuration files support user variable 
replacement (#4969)
c226acca57 is described below

commit c226acca574080b3faa49563cc19c1c2d2ccbcf8
Author: FuYouJ <[email protected]>
AuthorDate: Thu Jun 29 11:54:22 2023 +0800

    [Feature][Zeta] Configuration files support user variable replacement 
(#4969)
---
 release-note.md                                    |  1 +
 .../starter/seatunnel/args/ClientCommandArgs.java  | 12 +++
 .../seatunnel/args/ClientCommandArgsTest.java      | 93 ++++++++++++++++++++++
 .../test/resources/args/user_defined_params.conf   | 51 ++++++++++++
 4 files changed, 157 insertions(+)

diff --git a/release-note.md b/release-note.md
index 1c09b9af26..cbf9b562a2 100644
--- a/release-note.md
+++ b/release-note.md
@@ -61,6 +61,7 @@
 - [Zeta] Fix task `notifyTaskStatusToMaster` failed when job not running or 
failed before run (#4847)
 - [Zeta] Fix cpu load problem (#4828)
 - [zeta] Fix the deadlock issue with JDBC driver loading (#4878)
+- [zeta] dynamically replace the value of the variable at runtime (#4950)
 
 ### E2E
 
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 9504c9cb2c..eefcc1a0a6 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 @EqualsAndHashCode(callSuper = true)
 @Data
@@ -102,6 +103,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     @Override
     public Command<?> buildCommand() {
         Common.setDeployMode(getDeployMode());
+        userParamsToSysEnv();
         if (checkConfig) {
             return new SeaTunnelConfValidateCommand(this);
         }
@@ -114,6 +116,16 @@ public class ClientCommandArgs extends AbstractCommandArgs 
{
         return new ClientExecuteCommand(this);
     }
 
+    private void userParamsToSysEnv() {
+        if (!this.variables.isEmpty()) {
+            variables.stream()
+                    .filter(Objects::nonNull)
+                    .map(variable -> variable.split("=", 2))
+                    .filter(pair -> pair.length == 2)
+                    .forEach(pair -> System.setProperty(pair[0], pair[1]));
+        }
+    }
+
     public DeployMode getDeployMode() {
         return DeployMode.CLIENT;
     }
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
new file mode 100644
index 0000000000..5f197367d0
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.args;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class ClientCommandArgsTest {
+    @Test
+    public void testUserDefinedParamsCommand() throws URISyntaxException {
+        int fakeParallelism = 16;
+        String username = "seatunnel=2.3.1";
+        String password = "dsjr42=4wfskahdsd=w1chh";
+        String fakeSourceTable = "fake";
+        String fakeSinkTable = "sink";
+        String[] args = {
+            "-c",
+            "/args/user_defined_params.conf",
+            "-e",
+            "local",
+            "-i",
+            "fake_source_table=" + fakeSourceTable,
+            "-i",
+            "fake_parallelism=" + fakeParallelism,
+            "-i",
+            "fake_sink_table=" + fakeSinkTable,
+            "-i",
+            "password=" + password,
+            "-i",
+            "username=" + username
+        };
+        ClientCommandArgs clientCommandArgs =
+                CommandLineUtils.parse(args, new ClientCommandArgs(), 
"seatunnel-zeta", true);
+        clientCommandArgs.buildCommand();
+        URL resource = 
ClientCommandArgsTest.class.getResource("/args/user_defined_params.conf");
+
+        Config config =
+                ConfigFactory.parseFile(Paths.get(resource.toURI()).toFile())
+                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        List<? extends ConfigObject> sourceConfigs = 
config.getObjectList("source");
+        for (ConfigObject configObject : sourceConfigs) {
+            Config sourceConfig = configObject.toConfig();
+
+            String tableName = sourceConfig.getString("result_table_name");
+            Assertions.assertEquals(tableName, fakeSourceTable);
+
+            int parallelism = 
Integer.parseInt(sourceConfig.getString("parallelism"));
+            Assertions.assertEquals(fakeParallelism, parallelism);
+
+            Assertions.assertEquals(sourceConfig.getString("username"), 
username);
+            Assertions.assertEquals(sourceConfig.getString("password"), 
password);
+        }
+        List<? extends ConfigObject> sinkConfigs = 
config.getObjectList("sink");
+        for (ConfigObject sinkObject : sinkConfigs) {
+            Config sinkConfig = sinkObject.toConfig();
+            String tableName = sinkConfig.getString("result_table_name");
+            Assertions.assertEquals(tableName, fakeSinkTable);
+
+            Assertions.assertEquals(sinkConfig.getString("username"), 
username);
+            Assertions.assertEquals(sinkConfig.getString("password"), 
password);
+        }
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
 
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
new file mode 100644
index 0000000000..9dfde35dd6
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = ${fake_source_table}
+    parallelism = ${fake_parallelism}
+    username = ${username}
+    password = ${password}
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  console {
+     result_table_name = ${fake_sink_table}
+     username = ${username}
+     password = ${password}
+  }
+}
\ No newline at end of file

Reply via email to