This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c226acca57 [Feature][Zeta] Configuration files support user variable
replacement (#4969)
c226acca57 is described below
commit c226acca574080b3faa49563cc19c1c2d2ccbcf8
Author: FuYouJ <[email protected]>
AuthorDate: Thu Jun 29 11:54:22 2023 +0800
[Feature][Zeta] Configuration files support user variable replacement
(#4969)
---
release-note.md | 1 +
.../starter/seatunnel/args/ClientCommandArgs.java | 12 +++
.../seatunnel/args/ClientCommandArgsTest.java | 93 ++++++++++++++++++++++
.../test/resources/args/user_defined_params.conf | 51 ++++++++++++
4 files changed, 157 insertions(+)
diff --git a/release-note.md b/release-note.md
index 1c09b9af26..cbf9b562a2 100644
--- a/release-note.md
+++ b/release-note.md
@@ -61,6 +61,7 @@
- [Zeta] Fix task `notifyTaskStatusToMaster` failed when job not running or
failed before run (#4847)
- [Zeta] Fix cpu load problem (#4828)
- [zeta] Fix the deadlock issue with JDBC driver loading (#4878)
+- [zeta] dynamically replace the value of the variable at runtime (#4950)
### E2E
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 9504c9cb2c..eefcc1a0a6 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
@EqualsAndHashCode(callSuper = true)
@Data
@@ -102,6 +103,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
@Override
public Command<?> buildCommand() {
Common.setDeployMode(getDeployMode());
+ userParamsToSysEnv();
if (checkConfig) {
return new SeaTunnelConfValidateCommand(this);
}
@@ -114,6 +116,16 @@ public class ClientCommandArgs extends AbstractCommandArgs
{
return new ClientExecuteCommand(this);
}
+ private void userParamsToSysEnv() {
+ if (!this.variables.isEmpty()) {
+ variables.stream()
+ .filter(Objects::nonNull)
+ .map(variable -> variable.split("=", 2))
+ .filter(pair -> pair.length == 2)
+ .forEach(pair -> System.setProperty(pair[0], pair[1]));
+ }
+ }
+
public DeployMode getDeployMode() {
return DeployMode.CLIENT;
}
diff --git
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
new file mode 100644
index 0000000000..5f197367d0
--- /dev/null
+++
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.args;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class ClientCommandArgsTest {
+ @Test
+ public void testUserDefinedParamsCommand() throws URISyntaxException {
+ int fakeParallelism = 16;
+ String username = "seatunnel=2.3.1";
+ String password = "dsjr42=4wfskahdsd=w1chh";
+ String fakeSourceTable = "fake";
+ String fakeSinkTable = "sink";
+ String[] args = {
+ "-c",
+ "/args/user_defined_params.conf",
+ "-e",
+ "local",
+ "-i",
+ "fake_source_table=" + fakeSourceTable,
+ "-i",
+ "fake_parallelism=" + fakeParallelism,
+ "-i",
+ "fake_sink_table=" + fakeSinkTable,
+ "-i",
+ "password=" + password,
+ "-i",
+ "username=" + username
+ };
+ ClientCommandArgs clientCommandArgs =
+ CommandLineUtils.parse(args, new ClientCommandArgs(),
"seatunnel-zeta", true);
+ clientCommandArgs.buildCommand();
+ URL resource =
ClientCommandArgsTest.class.getResource("/args/user_defined_params.conf");
+
+ Config config =
+ ConfigFactory.parseFile(Paths.get(resource.toURI()).toFile())
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(
+ ConfigFactory.systemProperties(),
+
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ List<? extends ConfigObject> sourceConfigs =
config.getObjectList("source");
+ for (ConfigObject configObject : sourceConfigs) {
+ Config sourceConfig = configObject.toConfig();
+
+ String tableName = sourceConfig.getString("result_table_name");
+ Assertions.assertEquals(tableName, fakeSourceTable);
+
+ int parallelism =
Integer.parseInt(sourceConfig.getString("parallelism"));
+ Assertions.assertEquals(fakeParallelism, parallelism);
+
+ Assertions.assertEquals(sourceConfig.getString("username"),
username);
+ Assertions.assertEquals(sourceConfig.getString("password"),
password);
+ }
+ List<? extends ConfigObject> sinkConfigs =
config.getObjectList("sink");
+ for (ConfigObject sinkObject : sinkConfigs) {
+ Config sinkConfig = sinkObject.toConfig();
+ String tableName = sinkConfig.getString("result_table_name");
+ Assertions.assertEquals(tableName, fakeSinkTable);
+
+ Assertions.assertEquals(sinkConfig.getString("username"),
username);
+ Assertions.assertEquals(sinkConfig.getString("password"),
password);
+ }
+ }
+}
diff --git
a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
new file mode 100644
index 0000000000..9dfde35dd6
--- /dev/null
+++
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = ${fake_source_table}
+ parallelism = ${fake_parallelism}
+ username = ${username}
+ password = ${password}
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ console {
+ result_table_name = ${fake_sink_table}
+ username = ${username}
+ password = ${password}
+ }
+}
\ No newline at end of file