This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ee6a9870 [Feature][Seatunnel-core-fline] Support Flink dynamic 
configurations (#1800)
ee6a9870 is described below

commit ee6a9870b877f7dc5998b87a32d46f69e637dafa
Author: legendtkl <[email protected]>
AuthorDate: Fri May 6 11:57:36 2022 +0800

    [Feature][Seatunnel-core-fline] Support Flink dynamic configurations (#1800)
    
    * [Feature][Seatunnel-core-fline] Support Flink dynamic configurations
    
    * fix failed ut
    
    * update doc
    
    * address review comment
    
    Co-authored-by: taokelu <[email protected]>
---
 docs/en/command/usage.mdx                          |  2 +
 .../seatunnel/core/base/config/ConfigParser.java   | 45 +++++++++++++++++++
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java |  4 +-
 .../apache/seatunnel/core/flink/FlinkStarter.java  |  4 +-
 .../core/flink/utils/CommandLineUtils.java         | 13 +++++-
 .../seatunnel/core/flink/FlinkStarterTest.java     | 38 ++++++++--------
 .../core/flink/utils/CommandLineUtilsTest.java     | 20 +++++++--
 .../src/test/resources/app.conf                    | 50 ++++++++++++++++++++++
 .../apache/seatunnel/core/spark/SparkStarter.java  | 16 +------
 9 files changed, 148 insertions(+), 44 deletions(-)

diff --git a/docs/en/command/usage.mdx b/docs/en/command/usage.mdx
index 364ecd43..e7406d83 100644
--- a/docs/en/command/usage.mdx
+++ b/docs/en/command/usage.mdx
@@ -133,6 +133,8 @@ bin/start-seatunnel-flink.sh \
 
 This designation will replace `"${my_name}"` in the configuration file with 
`kid-xiong`
 
+> All the configurations in the `env` section will be applied to Flink dynamic 
parameters with the format of `-D`, such as `-Dexecution.parallelism=1` .
+
 > For the rest of the parameters, refer to the original flink parameters. 
 > Check the flink parameter method: `bin/flink run -h` . The parameters can be 
 > added as needed. For example, `-m yarn-cluster` is specified as `on yarn` 
 > mode.
 
 ```bash
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigParser.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigParser.java
new file mode 100644
index 00000000..8886b628
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigParser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.base.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ConfigParser {
+
+    public static Map<String, String> getConfigEnvValues(String configFile) 
throws FileNotFoundException {
+        File file = new File(configFile);
+        if (!file.exists()) {
+            throw new FileNotFoundException("config file '" + file + "' does 
not exists!");
+        }
+        Config appConfig = ConfigFactory.parseFile(file)
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(), 
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+        return appConfig.getConfig("env")
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().unwrapped().toString()));
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
 
b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 1e5f52a4..313b966c 100644
--- 
a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ 
b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -44,12 +44,12 @@ public class FlinkSqlStarter implements Starter {
     }
 
     @Override
-    public List<String> buildCommands() {
+    public List<String> buildCommands() throws Exception {
         return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, 
CLASS_NAME, appJar);
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
         System.out.println(String.join(" ", flinkSqlStarter.buildCommands()));
     }
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 6fe18a49..2141dc86 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -51,13 +51,13 @@ public class FlinkStarter implements Starter {
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         FlinkStarter flinkStarter = new FlinkStarter(args);
         System.out.println(String.join(" ", flinkStarter.buildCommands()));
     }
 
     @Override
-    public List<String> buildCommands() {
+    public List<String> buildCommands() throws Exception {
         return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, 
appJar);
     }
 
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index cdbf610a..d4d360d2 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -19,14 +19,18 @@ package org.apache.seatunnel.core.flink.utils;
 
 import static 
org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
 
+import org.apache.seatunnel.core.base.config.ConfigParser;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.UnixStyleUsageFormatter;
 
+import java.io.FileNotFoundException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 public class CommandLineUtils {
@@ -63,7 +67,7 @@ public class CommandLineUtils {
 
     }
 
-    public static List<String> buildFlinkCommand(FlinkCommandArgs 
flinkCommandArgs, String className, String jarPath) {
+    public static List<String> buildFlinkCommand(FlinkCommandArgs 
flinkCommandArgs, String className, String jarPath) throws 
FileNotFoundException {
         List<String> command = new ArrayList<>();
         command.add("${FLINK_HOME}/bin/flink");
         command.add(flinkCommandArgs.getRunMode().getMode());
@@ -81,6 +85,13 @@ public class CommandLineUtils {
           .filter(Objects::nonNull)
           .map(String::trim)
           .forEach(variable -> command.add("-D" + variable));
+
+        ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
+            .entrySet()
+            .stream()
+            .sorted(Comparator.comparing(Map.Entry::getKey))
+            .forEach(entry -> command.add("-D" + entry.getKey() + "=" + 
entry.getValue()));
+
         return command;
 
     }
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
 
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 02d23b3c..8662a804 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -17,49 +17,45 @@
 
 package org.apache.seatunnel.core.flink;
 
+import com.beust.jcommander.ParameterException;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class FlinkStarterTest {
+    static final String APP_CONF_PATH = 
ClassLoader.getSystemResource("app.conf").getPath();
 
     @Test
-    public void buildCommands() {
-        String[] args = {"--config", "test.conf", "-m", "yarn-cluster", "-i", 
"key1=value1", "-i", "key2=value2"};
+    public void buildCommands() throws Exception {
+        String[] args = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", 
"-i", "key1=value1", "-i", "key2=value2"};
         FlinkStarter flinkStarter = new FlinkStarter(args);
         String flinkExecuteCommand = String.join(" ", 
flinkStarter.buildCommands());
         // since we cannot get the actual jar path, so we just check the 
command contains the command
-        Assert.assertTrue(flinkExecuteCommand.contains("--config test.conf"));
+        Assert.assertTrue(flinkExecuteCommand.contains("--config " + 
APP_CONF_PATH));
         Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
         Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
         
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
 
-        String[] args1 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", 
"key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
+        String[] args1 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", 
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
         flinkExecuteCommand = String.join(" ", new 
FlinkStarter(args1).buildCommands());
         
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink 
run-application"));
 
-        String[] args2 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", 
"key1=value1", "-i", "key2=value2", "--run-mode", "run"};
+        String[] args2 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", 
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run"};
         flinkExecuteCommand = String.join(" ", new 
FlinkStarter(args2).buildCommands());
         
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
 
-        try {
-            String[] args3 = {"--config", "test.conf", "-m", "yarn-cluster", 
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
-            new FlinkStarter(args3);
-        } catch (Exception e) {
-            Assert.assertTrue(e instanceof IllegalArgumentException);
-            Assert.assertEquals("Run mode run123 not supported", 
e.getMessage());
-        }
+        String[] args3 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", 
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
+        Assert.assertThrows("Run mode run123 not supported", 
IllegalArgumentException.class, () -> new FlinkStarter(args3));
     }
 
     @Test
     public void buildCommandsMissingConfig() {
-        try {
-            String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i", 
"key2=value2"};
-            FlinkStarter flinkStarter = new FlinkStarter(args);
-            String flinkExecuteCommand = String.join(" ", 
flinkStarter.buildCommands());
-            // since we cannot get the actual jar path, so we just check the 
command contains the command
-            Assert.assertTrue(flinkExecuteCommand.contains("--config 
flink.yarn.conf"));
-        } catch (Exception e) {
-            Assert.assertEquals("The following option is required: [-c | 
--config]", e.getMessage());
-        }
+        Assert.assertThrows("The following option is required: [-c | 
--config]", ParameterException.class,
+            () -> {
+                String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", 
"-i", "key2=value2"};
+                FlinkStarter flinkStarter = new FlinkStarter(args);
+                String flinkExecuteCommand = String.join(" ", 
flinkStarter.buildCommands());
+                // since we cannot get the actual jar path, so we just check 
the command contains the command
+                Assert.assertTrue(flinkExecuteCommand.contains("--config 
flink.yarn.conf"));
+            });
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
 
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 66f1ec69..8d879b1c 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -24,10 +24,12 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.FileNotFoundException;
 import java.util.Arrays;
 import java.util.List;
 
 public class CommandLineUtilsTest {
+    static final String APP_CONF_PATH = 
ClassLoader.getSystemResource("app.conf").getPath();
 
     @Test
     public void testParseCommandArgs() {
@@ -47,20 +49,30 @@ public class CommandLineUtilsTest {
     }
 
     @Test
-    public void testBuildFlinkCommand() {
-        String[] args = {"--detached", "-c", "app.conf", "-t", "-i", 
"city=shenyang", "-i", "date=20200202",
+    public void testBuildFlinkCommand() throws FileNotFoundException {
+        String[] args = {"--detached", "-c", APP_CONF_PATH, "-t", "-i", 
"city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
         FlinkCommandArgs flinkCommandArgs = 
CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         List<String> commands = 
CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", 
"/path/to/jar");
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", 
"--detached", "--unkown", "unkown-command", "-c",
-                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", 
"--check", "-Dcity=shenyang", "-Ddate=20200202"));
+                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, 
"--check", "-Dcity=shenyang", "-Ddate=20200202",
+                
"-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", 
"-Dexecution.checkpoint.interval=10000",
+                "-Dexecution.parallelism=1"));
 
         flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, 
FlinkJobType.SQL);
         commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, 
"CLASS_NAME", "/path/to/jar");
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", 
"--detached", "--unkown", "unkown-command", "-c",
-                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", 
"--check", "-Dcity=shenyang", "-Ddate=20200202"));
+                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, 
"--check", "-Dcity=shenyang", "-Ddate=20200202",
+                
"-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", 
"-Dexecution.checkpoint.interval=10000",
+                "-Dexecution.parallelism=1"));
+
+        String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i", 
"city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
 
+        Assert.assertThrows(FileNotFoundException.class, () -> {
+            
CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, 
FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar");
+        });
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/resources/app.conf 
b/seatunnel-core/seatunnel-core-flink/src/test/resources/app.conf
new file mode 100644
index 00000000..5d937807
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/resources/app.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  execution.checkpoint.interval = 10000
+  execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example input plugin **only for test and demonstrate the feature 
input plugin**
+    FakeSourceStream {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of input plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of filter plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  ConsoleSink {}
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of output plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
diff --git 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 81f44ba9..fd7e12d2 100644
--- 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -25,14 +25,13 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.base.Starter;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.ConfigParser;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.config.PluginFactory;
 import org.apache.seatunnel.core.base.utils.CompressionUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.UnixStyleUsageFormatter;
@@ -181,18 +180,7 @@ public class SparkStarter implements Starter {
      * Get spark configurations from SeaTunnel job config file.
      */
     static Map<String, String> getSparkConf(String configFile) throws 
FileNotFoundException {
-        File file = new File(configFile);
-        if (!file.exists()) {
-            throw new FileNotFoundException("config file '" + file + "' does 
not exists!");
-        }
-        Config appConfig = ConfigFactory.parseFile(file)
-                
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                .resolveWith(ConfigFactory.systemProperties(), 
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-
-        return appConfig.getConfig("env")
-                .entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().unwrapped().toString()));
+        return ConfigParser.getConfigEnvValues(configFile);
     }
 
     /**

Reply via email to