This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ee6a9870 [Feature][Seatunnel-core-fline] Support Flink dynamic
configurations (#1800)
ee6a9870 is described below
commit ee6a9870b877f7dc5998b87a32d46f69e637dafa
Author: legendtkl <[email protected]>
AuthorDate: Fri May 6 11:57:36 2022 +0800
[Feature][Seatunnel-core-fline] Support Flink dynamic configurations (#1800)
* [Feature][Seatunnel-core-fline] Support Flink dynamic configurations
* fix failed ut
* update doc
* address review comment
Co-authored-by: taokelu <[email protected]>
---
docs/en/command/usage.mdx | 2 +
.../seatunnel/core/base/config/ConfigParser.java | 45 +++++++++++++++++++
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 4 +-
.../apache/seatunnel/core/flink/FlinkStarter.java | 4 +-
.../core/flink/utils/CommandLineUtils.java | 13 +++++-
.../seatunnel/core/flink/FlinkStarterTest.java | 38 ++++++++--------
.../core/flink/utils/CommandLineUtilsTest.java | 20 +++++++--
.../src/test/resources/app.conf | 50 ++++++++++++++++++++++
.../apache/seatunnel/core/spark/SparkStarter.java | 16 +------
9 files changed, 148 insertions(+), 44 deletions(-)
diff --git a/docs/en/command/usage.mdx b/docs/en/command/usage.mdx
index 364ecd43..e7406d83 100644
--- a/docs/en/command/usage.mdx
+++ b/docs/en/command/usage.mdx
@@ -133,6 +133,8 @@ bin/start-seatunnel-flink.sh \
This designation will replace `"${my_name}"` in the configuration file with
`kid-xiong`
+> All the configurations in the `env` section will be applied to Flink dynamic
parameters with the format of `-D`, such as `-Dexecution.parallelism=1` .
+
> For the rest of the parameters, refer to the original flink parameters.
> Check the flink parameter method: `bin/flink run -h` . The parameters can be
> added as needed. For example, `-m yarn-cluster` is specified as `on yarn`
> mode.
```bash
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigParser.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigParser.java
new file mode 100644
index 00000000..8886b628
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigParser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.base.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ConfigParser {
+
+ public static Map<String, String> getConfigEnvValues(String configFile)
throws FileNotFoundException {
+ File file = new File(configFile);
+ if (!file.exists()) {
+ throw new FileNotFoundException("config file '" + file + "' does
not exists!");
+ }
+ Config appConfig = ConfigFactory.parseFile(file)
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ return appConfig.getConfig("env")
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().unwrapped().toString()));
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 1e5f52a4..313b966c 100644
---
a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++
b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -44,12 +44,12 @@ public class FlinkSqlStarter implements Starter {
}
@Override
- public List<String> buildCommands() {
+ public List<String> buildCommands() throws Exception {
return CommandLineUtils.buildFlinkCommand(flinkCommandArgs,
CLASS_NAME, appJar);
}
@SuppressWarnings("checkstyle:RegexpSingleline")
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
System.out.println(String.join(" ", flinkSqlStarter.buildCommands()));
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 6fe18a49..2141dc86 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -51,13 +51,13 @@ public class FlinkStarter implements Starter {
}
@SuppressWarnings("checkstyle:RegexpSingleline")
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
FlinkStarter flinkStarter = new FlinkStarter(args);
System.out.println(String.join(" ", flinkStarter.buildCommands()));
}
@Override
- public List<String> buildCommands() {
+ public List<String> buildCommands() throws Exception {
return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME,
appJar);
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index cdbf610a..d4d360d2 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -19,14 +19,18 @@ package org.apache.seatunnel.core.flink.utils;
import static
org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
+import org.apache.seatunnel.core.base.config.ConfigParser;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.UnixStyleUsageFormatter;
+import java.io.FileNotFoundException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
public class CommandLineUtils {
@@ -63,7 +67,7 @@ public class CommandLineUtils {
}
- public static List<String> buildFlinkCommand(FlinkCommandArgs
flinkCommandArgs, String className, String jarPath) {
+ public static List<String> buildFlinkCommand(FlinkCommandArgs
flinkCommandArgs, String className, String jarPath) throws
FileNotFoundException {
List<String> command = new ArrayList<>();
command.add("${FLINK_HOME}/bin/flink");
command.add(flinkCommandArgs.getRunMode().getMode());
@@ -81,6 +85,13 @@ public class CommandLineUtils {
.filter(Objects::nonNull)
.map(String::trim)
.forEach(variable -> command.add("-D" + variable));
+
+ ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
+ .entrySet()
+ .stream()
+ .sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> command.add("-D" + entry.getKey() + "=" +
entry.getValue()));
+
return command;
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 02d23b3c..8662a804 100644
---
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
+++
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -17,49 +17,45 @@
package org.apache.seatunnel.core.flink;
+import com.beust.jcommander.ParameterException;
import org.junit.Assert;
import org.junit.Test;
public class FlinkStarterTest {
+ static final String APP_CONF_PATH =
ClassLoader.getSystemResource("app.conf").getPath();
@Test
- public void buildCommands() {
- String[] args = {"--config", "test.conf", "-m", "yarn-cluster", "-i",
"key1=value1", "-i", "key2=value2"};
+ public void buildCommands() throws Exception {
+ String[] args = {"--config", APP_CONF_PATH, "-m", "yarn-cluster",
"-i", "key1=value1", "-i", "key2=value2"};
FlinkStarter flinkStarter = new FlinkStarter(args);
String flinkExecuteCommand = String.join(" ",
flinkStarter.buildCommands());
// since we cannot get the actual jar path, so we just check the
command contains the command
- Assert.assertTrue(flinkExecuteCommand.contains("--config test.conf"));
+ Assert.assertTrue(flinkExecuteCommand.contains("--config " +
APP_CONF_PATH));
Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
- String[] args1 = {"--config", "test.conf", "-m", "yarn-cluster", "-i",
"key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
+ String[] args1 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster",
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
flinkExecuteCommand = String.join(" ", new
FlinkStarter(args1).buildCommands());
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink
run-application"));
- String[] args2 = {"--config", "test.conf", "-m", "yarn-cluster", "-i",
"key1=value1", "-i", "key2=value2", "--run-mode", "run"};
+ String[] args2 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster",
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run"};
flinkExecuteCommand = String.join(" ", new
FlinkStarter(args2).buildCommands());
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
- try {
- String[] args3 = {"--config", "test.conf", "-m", "yarn-cluster",
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
- new FlinkStarter(args3);
- } catch (Exception e) {
- Assert.assertTrue(e instanceof IllegalArgumentException);
- Assert.assertEquals("Run mode run123 not supported",
e.getMessage());
- }
+ String[] args3 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster",
"-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
+ Assert.assertThrows("Run mode run123 not supported",
IllegalArgumentException.class, () -> new FlinkStarter(args3));
}
@Test
public void buildCommandsMissingConfig() {
- try {
- String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i",
"key2=value2"};
- FlinkStarter flinkStarter = new FlinkStarter(args);
- String flinkExecuteCommand = String.join(" ",
flinkStarter.buildCommands());
- // since we cannot get the actual jar path, so we just check the
command contains the command
- Assert.assertTrue(flinkExecuteCommand.contains("--config
flink.yarn.conf"));
- } catch (Exception e) {
- Assert.assertEquals("The following option is required: [-c |
--config]", e.getMessage());
- }
+ Assert.assertThrows("The following option is required: [-c |
--config]", ParameterException.class,
+ () -> {
+ String[] args = {"-m", "yarn-cluster", "-i", "key1=value1",
"-i", "key2=value2"};
+ FlinkStarter flinkStarter = new FlinkStarter(args);
+ String flinkExecuteCommand = String.join(" ",
flinkStarter.buildCommands());
+ // since we cannot get the actual jar path, so we just check
the command contains the command
+ Assert.assertTrue(flinkExecuteCommand.contains("--config
flink.yarn.conf"));
+ });
}
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 66f1ec69..8d879b1c 100644
---
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -24,10 +24,12 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
import org.junit.Assert;
import org.junit.Test;
+import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.List;
public class CommandLineUtilsTest {
+ static final String APP_CONF_PATH =
ClassLoader.getSystemResource("app.conf").getPath();
@Test
public void testParseCommandArgs() {
@@ -47,20 +49,30 @@ public class CommandLineUtilsTest {
}
@Test
- public void testBuildFlinkCommand() {
- String[] args = {"--detached", "-c", "app.conf", "-t", "-i",
"city=shenyang", "-i", "date=20200202",
+ public void testBuildFlinkCommand() throws FileNotFoundException {
+ String[] args = {"--detached", "-c", APP_CONF_PATH, "-t", "-i",
"city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
FlinkCommandArgs flinkCommandArgs =
CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
List<String> commands =
CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME",
"/path/to/jar");
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application",
"--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", "app.conf",
"--check", "-Dcity=shenyang", "-Ddate=20200202"));
+ "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH,
"--check", "-Dcity=shenyang", "-Ddate=20200202",
+
"-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint",
"-Dexecution.checkpoint.interval=10000",
+ "-Dexecution.parallelism=1"));
flinkCommandArgs = CommandLineUtils.parseCommandArgs(args,
FlinkJobType.SQL);
commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs,
"CLASS_NAME", "/path/to/jar");
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application",
"--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", "app.conf",
"--check", "-Dcity=shenyang", "-Ddate=20200202"));
+ "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH,
"--check", "-Dcity=shenyang", "-Ddate=20200202",
+
"-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint",
"-Dexecution.checkpoint.interval=10000",
+ "-Dexecution.parallelism=1"));
+
+ String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i",
"city=shenyang", "-i", "date=20200202",
+ "-r", "run-application", "--unkown", "unkown-command"};
+ Assert.assertThrows(FileNotFoundException.class, () -> {
+
CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1,
FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar");
+ });
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/resources/app.conf
b/seatunnel-core/seatunnel-core-flink/src/test/resources/app.conf
new file mode 100644
index 00000000..5d937807
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/resources/app.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ execution.checkpoint.interval = 10000
+ execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example input plugin **only for test and demonstrate the feature
input plugin**
+ FakeSourceStream {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of input plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of filter plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ ConsoleSink {}
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of output plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 81f44ba9..fd7e12d2 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -25,14 +25,13 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.ConfigParser;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.PluginFactory;
import org.apache.seatunnel.core.base.utils.CompressionUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.UnixStyleUsageFormatter;
@@ -181,18 +180,7 @@ public class SparkStarter implements Starter {
* Get spark configurations from SeaTunnel job config file.
*/
static Map<String, String> getSparkConf(String configFile) throws
FileNotFoundException {
- File file = new File(configFile);
- if (!file.exists()) {
- throw new FileNotFoundException("config file '" + file + "' does
not exists!");
- }
- Config appConfig = ConfigFactory.parseFile(file)
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
-
- return appConfig.getConfig("env")
- .entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().unwrapped().toString()));
+ return ConfigParser.getConfigEnvValues(configFile);
}
/**