This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a3c3e7797 [feature][task-flink] Support Flink application mode (#9577)
4a3c3e7797 is described below
commit 4a3c3e779781b432e43cc96ab6a729c060e084d7
Author: Paul Zhang <[email protected]>
AuthorDate: Fri Jul 1 12:20:38 2022 +0800
[feature][task-flink] Support Flink application mode (#9577)
---
docs/docs/en/guide/task/flink.md | 30 +-
docs/docs/zh/guide/task/flink.md | 2 +-
.../plugin/task/flink/FileUtils.java | 103 +++++++
.../plugin/task/flink/FlinkArgsUtils.java | 271 ++++++++++++++++++
.../plugin/task/flink/FlinkConstants.java | 14 +-
.../plugin/task/flink/FlinkDeployMode.java | 32 +++
.../plugin/task/flink/FlinkParameters.java | 8 +-
.../plugin/task/flink/FlinkTask.java | 307 +--------------------
.../plugin/task/flink/FlinkArgsUtilsTest.java | 132 +++++++++
.../plugin/task/flink/FlinkTaskTest.java | 116 --------
.../task/components/node/fields/use-flink.ts | 65 ++++-
11 files changed, 641 insertions(+), 439 deletions(-)
diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md
index 821e16695b..44c9461803 100644
--- a/docs/docs/en/guide/task/flink.md
+++ b/docs/docs/en/guide/task/flink.md
@@ -30,7 +30,7 @@ Flink task type, used to execute Flink programs. For Flink
nodes:
| Program type | Support Java, Scala, Python and SQL four languages. |
| Class of main function**: The **full path** of Main Class, the entry point
of the Flink program. |
| Main jar package | The jar package of the Flink program (upload by Resource
Center). |
-| Deployment mode | Support 2 deployment modes: cluster and local. |
+| Deployment mode | Support 3 deployment modes: cluster, local and application
(Flink 1.11 and later. See also [Run an application in Application
Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)).
|
| Initialization script | Script file to initialize session context. |
| Script | The sql script file developed by the user that should be executed. |
| Flink version | Select version according to the execution environment. |
@@ -45,6 +45,34 @@ Flink task type, used to execute Flink programs. For Flink
nodes:
| Resource | Appoint resource files in the `Resource` if parameters refer to
them. |
| Custom parameter | It is a local user-defined parameter for Flink, and will
replace the content with `${variable}` in the script. |
| Predecessor task | Selecting a predecessor task for the current task, will
set the selected predecessor task as upstream of the current task. |
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does
not need to execute, select the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient,
execute in the order of priority from high to low, and tasks with the same
priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to
execute. If `Default` is selected, randomly select a worker machine for
execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to
resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting
the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in
execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the
task runs exceed the "timeout", an alarm email will send and the task execution
will fail.
+- **Program type**: Support Java, Scala, Python and SQL four languages.
+- **The class of main function**: The **full path** of Main Class, the entry
point of the Flink program.
+- **Main jar package**: The jar package of the Flink program (upload by
Resource Center).
+- **Deployment mode**: Support 3 deployment modes: cluster, local and
application (Flink 1.11 and later. See also [Run an application in Application
Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)).
+- **Initialization script**: Script file to initialize session context.
+- **Script**: The sql script file developed by the user that should be
executed.
+- **Flink version**: Select version according to the execution env.
+- **Task name** (optional): Flink task name.
+- **JobManager memory size**: Used to set the size of jobManager memories,
which can be set according to the actual production environment.
+- **Number of slots**: Used to set the number of slots, which can be set
according to the actual production environment.
+- **TaskManager memory size**: Used to set the size of taskManager memories,
which can be set according to the actual production environment.
+- **Number of TaskManager**: Used to set the number of taskManagers, which can
be set according to the actual production environment.
+- **Parallelism**: Used to set the degree of parallelism for executing Flink
tasks.
+- **Main program parameters**: Set the input parameters for the Flink program
and support the substitution of custom parameter variables.
+- **Optional parameters**: Support `--jar`, `--files`,` --archives`, `--conf`
format.
+- **Resource**: Appoint resource files in the `Resource` if parameters refer
to them.
+- **Custom parameter**: It is a local user-defined parameter for Flink, and
will replace the content with `${variable}` in the script.
+- **Predecessor task**: Selecting a predecessor task for the current task,
will set the selected predecessor task as upstream of the current task.
## Task Example
diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md
index 212f6ee208..2cfb9b4b6d 100644
--- a/docs/docs/zh/guide/task/flink.md
+++ b/docs/docs/zh/guide/task/flink.md
@@ -28,7 +28,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
- 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。
- 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。
- 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。
-- 部署方式:支持 cluster 和 local 两种模式的部署。
+- 部署方式:支持 cluster、 local 和 application (Flink 1.11和之后的版本支持,参见 [Run an
application in Application
Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode))
三种模式的部署。
- 初始化脚本:用于初始化会话上下文的脚本文件。
- 脚本:用户开发的应该执行的 SQL 脚本文件。
- Flink 版本:根据所需环境选择对应的版本即可。
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
new file mode 100644
index 0000000000..33f0fecbfb
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
+public class FileUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileUtils.class);
+ private FileUtils() {}
+
+ public static String getInitScriptFilePath(TaskExecutionContext
taskExecutionContext) {
+ return String.format("%s/%s_init.sql",
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+ }
+
+ public static String getScriptFilePath(TaskExecutionContext
taskExecutionContext) {
+ return String.format("%s/%s_node.sql",
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+ }
+
+ public static void generateScriptFile(TaskExecutionContext
taskExecutionContext, FlinkParameters flinkParameters) {
+ String initScriptFilePath =
FileUtils.getInitScriptFilePath(taskExecutionContext);
+ String scriptFilePath =
FileUtils.getScriptFilePath(taskExecutionContext);
+ String initOptionsString = StringUtils.join(
+ FlinkArgsUtils.buildInitOptionsForSql(flinkParameters),
+ FlinkConstants.FLINK_SQL_NEWLINE
+ ).concat(FlinkConstants.FLINK_SQL_NEWLINE);
+ writeScriptFile(initScriptFilePath, initOptionsString +
flinkParameters.getInitScript());
+ writeScriptFile(scriptFilePath, flinkParameters.getRawScript());
+ }
+
+ private static void writeScriptFile(String scriptFileFullPath, String
script) {
+ File scriptFile = new File(scriptFileFullPath);
+ Path path = scriptFile.toPath();
+ if (Files.exists(path)) {
+ try {
+ Files.delete(path);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Flink Script file
exists in path: %s before creation and cannot be deleted", path), e);
+ }
+ }
+
+ Set<PosixFilePermission> perms =
PosixFilePermissions.fromString(RWXR_XR_X);
+ FileAttribute<Set<PosixFilePermission>> attr =
PosixFilePermissions.asFileAttribute(perms);
+ try {
+ if (SystemUtils.IS_OS_WINDOWS) {
+ Files.createFile(path);
+ } else {
+ if (!scriptFile.getParentFile().exists()) {
+ scriptFile.getParentFile().mkdirs();
+ }
+ Files.createFile(path, attr);
+ }
+
+ if (StringUtils.isNotEmpty(script)) {
+ String replacedScript = script.replaceAll("\\r\\n", "\n");
+ FileUtils.writeStringToFile(scriptFile, replacedScript,
StandardOpenOption.APPEND);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Generate flink SQL script error", e);
+ }
+ }
+
+ private static void writeStringToFile(File file, String content,
StandardOpenOption standardOpenOption) {
+ try {
+ LOGGER.info("Writing content: " + content);
+ LOGGER.info("To file: " + file.getAbsolutePath());
+ Files.write(file.getAbsoluteFile().toPath(),
content.getBytes(StandardCharsets.UTF_8), standardOpenOption);
+ } catch(IOException e) {
+ throw new RuntimeException("Error writing file: " +
file.getAbsoluteFile(), e);
+ }
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
new file mode 100644
index 0000000000..53f94ea0d1
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * flink args utils
+ */
+public class FlinkArgsUtils {
+
+ private FlinkArgsUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ private static final String LOCAL_DEPLOY_MODE = "local";
+ private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
+ private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_12 = ">=1.12";
+ private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_13 = ">=1.13";
+ /**
+ * default flink deploy mode
+ */
+ public static final FlinkDeployMode DEFAULT_DEPLOY_MODE =
FlinkDeployMode.CLUSTER;
+
+ /**
+ * build flink command line
+ *
+ * @param param flink parameters
+ * @return argument list
+ */
+ public static List<String> buildCommandLine(TaskExecutionContext
taskExecutionContext, FlinkParameters param) {
+ switch (param.getProgramType()) {
+ case SQL:
+ return buildCommandLineForSql(taskExecutionContext, param);
+ default:
+ return buildCommandLineForOthers(taskExecutionContext, param);
+ }
+ }
+
+ /**
+ * build flink command line for SQL
+ *
+ * @return argument list
+ */
+ private static List<String> buildCommandLineForSql(TaskExecutionContext
taskExecutionContext, FlinkParameters flinkParameters) {
+ List<String> args = new ArrayList<>();
+
+ args.add(FlinkConstants.FLINK_SQL_COMMAND);
+
+ // -i
+ String initScriptFilePath =
FileUtils.getInitScriptFilePath(taskExecutionContext);
+ args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
+ args.add(initScriptFilePath);
+
+ // -f
+ String scriptFilePath =
FileUtils.getScriptFilePath(taskExecutionContext);
+ args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
+ args.add(scriptFilePath);
+
+ String others = flinkParameters.getOthers();
+ if (StringUtils.isNotEmpty(others)) {
+ args.add(others);
+ }
+ return args;
+ }
+
+ public static List<String> buildInitOptionsForSql(FlinkParameters
flinkParameters) {
+ List<String> initOptions = new ArrayList<>();
+
+ FlinkDeployMode deployMode =
Optional.ofNullable(flinkParameters.getDeployMode()).orElse(FlinkDeployMode.CLUSTER);
+
+ /**
+ * Currently flink sql on yarn only supports yarn-per-job mode
+ */
+ if (FlinkDeployMode.CLUSTER == deployMode) {
+ // execution.target
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET,
"local"));
+ } else {
+ // execution.target
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET,
FlinkConstants.FLINK_YARN_PER_JOB));
+
+ // taskmanager.numberOfTaskSlots
+ int slot = flinkParameters.getSlot();
+ if (slot > 0) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS,
slot));
+ }
+
+ // yarn.application.name
+ String appName = flinkParameters.getAppName();
+ if (StringUtils.isNotEmpty(appName)) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME,
ArgsUtils.escape(appName)));
+ }
+
+ // jobmanager.memory.process.size
+ String jobManagerMemory = flinkParameters.getJobManagerMemory();
+ if (StringUtils.isNotEmpty(jobManagerMemory)) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE,
jobManagerMemory));
+ }
+
+ // taskmanager.memory.process.size
+ String taskManagerMemory = flinkParameters.getTaskManagerMemory();
+ if (StringUtils.isNotEmpty(taskManagerMemory)) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE,
taskManagerMemory));
+ }
+
+ // yarn.application.queue
+ String others = flinkParameters.getOthers();
+ if (StringUtils.isEmpty(others) ||
!others.contains(FlinkConstants.FLINK_QUEUE)) {
+ String queue = flinkParameters.getQueue();
+ if (StringUtils.isNotEmpty(queue)) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
queue));
+ }
+ }
+ }
+
+ // parallelism.default
+ int parallelism = flinkParameters.getParallelism();
+ if (parallelism > 0) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT,
parallelism));
+ }
+
+ return initOptions;
+ }
+
+ private static List<String> buildCommandLineForOthers(TaskExecutionContext
taskExecutionContext, FlinkParameters flinkParameters) {
+ List<String> args = new ArrayList<>();
+
+ args.add(FlinkConstants.FLINK_COMMAND);
+ FlinkDeployMode deployMode =
Optional.ofNullable(flinkParameters.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE);
+ String flinkVersion = flinkParameters.getFlinkVersion();
+ // build run command
+ switch (deployMode) {
+ case CLUSTER:
+ if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) ||
FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
+ args.add(FlinkConstants.FLINK_RUN); //run
+ args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t
+ args.add(FlinkConstants.FLINK_YARN_PER_JOB);
//yarn-per-job
+ } else {
+ args.add(FlinkConstants.FLINK_RUN); //run
+ args.add(FlinkConstants.FLINK_RUN_MODE); //-m
+ args.add(FlinkConstants.FLINK_YARN_CLUSTER);
//yarn-cluster
+ }
+ break;
+ case APPLICATION:
+ args.add(FlinkConstants.FLINK_RUN_APPLICATION);
//run-application
+ args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t
+ args.add(FlinkConstants.FLINK_YARN_APPLICATION);
//yarn-application
+ break;
+ case LOCAL:
+ args.add(FlinkConstants.FLINK_RUN); //run
+ break;
+ }
+
+ String others = flinkParameters.getOthers();
+
+ // build args
+ switch (deployMode) {
+ case CLUSTER:
+ case APPLICATION:
+ int slot = flinkParameters.getSlot();
+ if (slot > 0) {
+ args.add(FlinkConstants.FLINK_YARN_SLOT);
+ args.add(String.format("%d", slot)); //-ys
+ }
+
+ String appName = flinkParameters.getAppName();
+ if (StringUtils.isNotEmpty(appName)) { //-ynm
+ args.add(FlinkConstants.FLINK_APP_NAME);
+ args.add(ArgsUtils.escape(appName));
+ }
+
+ // judge flink version, the parameter -yn has removed from
flink 1.10
+ if (flinkVersion == null ||
FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
+ int taskManager = flinkParameters.getTaskManager();
+ if (taskManager > 0) { //-yn
+ args.add(FlinkConstants.FLINK_TASK_MANAGE);
+ args.add(String.format("%d", taskManager));
+ }
+ }
+ String jobManagerMemory =
flinkParameters.getJobManagerMemory();
+ if (StringUtils.isNotEmpty(jobManagerMemory)) {
+ args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
+ args.add(jobManagerMemory); //-yjm
+ }
+
+ String taskManagerMemory =
flinkParameters.getTaskManagerMemory();
+ if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
+ args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
+ args.add(taskManagerMemory);
+ }
+
+ if (StringUtils.isEmpty(others) ||
!others.contains(FlinkConstants.FLINK_QUEUE)) {
+ String queue = flinkParameters.getQueue();
+ if (StringUtils.isNotEmpty(queue)) { // -yqu
+ args.add(FlinkConstants.FLINK_QUEUE);
+ args.add(queue);
+ }
+ }
+ break;
+ case LOCAL:
+ break;
+ }
+
+ int parallelism = flinkParameters.getParallelism();
+ if (parallelism > 0) {
+ args.add(FlinkConstants.FLINK_PARALLELISM);
+ args.add(String.format("%d", parallelism)); // -p
+ }
+
+ // If the job is submitted in attached mode, perform a best-effort
cluster shutdown when the CLI is terminated abruptly
+ // The task status will be synchronized with the cluster job status
+ args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
+
+ // -s -yqu -yat -yD -D
+ if (StringUtils.isNotEmpty(others)) {
+ args.add(others);
+ }
+
+ ProgramType programType = flinkParameters.getProgramType();
+ String mainClass = flinkParameters.getMainClass();
+ if (programType != null && programType != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
+ args.add(FlinkConstants.FLINK_MAIN_CLASS); //-c
+ args.add(flinkParameters.getMainClass()); //main class
+ }
+
+ ResourceInfo mainJar = flinkParameters.getMainJar();
+ if (mainJar != null) {
+ // -py
+ if(ProgramType.PYTHON == programType) {
+ args.add(FlinkConstants.FLINK_PYTHON);
+ }
+ args.add(mainJar.getRes());
+ }
+
+ String mainArgs = flinkParameters.getMainArgs();
+ if (StringUtils.isNotEmpty(mainArgs)) {
+ Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ args.add(ParameterUtils.convertParameterPlaceholders(mainArgs,
ParamUtils.convert(paramsMap)));
+ }
+
+ return args;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index 42cb5ad78c..91e4efc125 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -39,8 +39,12 @@ public class FlinkConstants {
/**
* flink run options
*/
+ public static final String FLINK_RUN_APPLICATION = "run-application";
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
+ public static final String FLINK_YARN_APPLICATION = "yarn-application";
+ public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
public static final String FLINK_RUN_MODE = "-m";
+ public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
@@ -51,7 +55,7 @@ public class FlinkConstants {
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final String FLINK_PYTHON = "-py";
-
+ // For Flink SQL
public static final String FLINK_FORMAT_EXECUTION_TARGET = "set
execution.target=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set
yarn.application.name=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set
yarn.application.queue=%s";
@@ -62,12 +66,4 @@ public class FlinkConstants {
public static final String FLINK_SQL_SCRIPT_FILE = "-f";
public static final String FLINK_SQL_INIT_FILE = "-i";
public static final String FLINK_SQL_NEWLINE = ";\n";
-
- // execution.target options
- public static final String EXECUTION_TARGET_YARN_PER_JOB = "yarn-per-job";
- public static final String EXECUTION_TARGET_LOACL = "local";
-
- public static final String DEPLOY_MODE_CLUSTER = "cluster";
- public static final String DEPLOY_MODE_LOCAL = "local";
- public static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
new file mode 100644
index 0000000000..b02cd40f92
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Flink deploy mode
+ */
+public enum FlinkDeployMode {
+ @JsonProperty("local")
+ LOCAL,
+ @JsonProperty("cluster")
+ CLUSTER,
+ @JsonProperty("application")
+ APPLICATION
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
index dca6fb58a3..8b45641989 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
@@ -39,9 +39,9 @@ public class FlinkParameters extends AbstractParameters {
private String mainClass;
/**
- * deploy mode yarn-cluster yarn-local
+ * deploy mode yarn-cluster yarn-local yarn-application
*/
- private String deployMode;
+ private FlinkDeployMode deployMode;
/**
* arguments
@@ -130,11 +130,11 @@ public class FlinkParameters extends AbstractParameters {
this.mainClass = mainClass;
}
- public String getDeployMode() {
+ public FlinkDeployMode getDeployMode() {
return deployMode;
}
- public void setDeployMode(String deployMode) {
+ public void setDeployMode(FlinkDeployMode deployMode) {
this.deployMode = deployMode;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index f04282a144..5a78eed1bd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.flink;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -26,26 +24,14 @@ import
org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class FlinkTask extends AbstractYarnTask {
@@ -66,7 +52,6 @@ public class FlinkTask extends AbstractYarnTask {
@Override
public void init() {
-
logger.info("flink task params {}",
taskExecutionContext.getTaskParams());
flinkParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
FlinkParameters.class);
@@ -75,10 +60,9 @@ public class FlinkTask extends AbstractYarnTask {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
+ setMainJarName();
- if (ProgramType.SQL != flinkParameters.getProgramType()) {
- setMainJarName();
- }
+ FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
/**
@@ -88,293 +72,16 @@ public class FlinkTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() {
- List<String> args = new ArrayList<>();
+ // flink run/run-application [OPTIONS] <jar-file> <arguments>
+ List<String> args =
FlinkArgsUtils.buildCommandLine(taskExecutionContext, flinkParameters);
+
+ String command = ParameterUtils
+ .convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
- if (ProgramType.SQL != flinkParameters.getProgramType()) {
- // execute flink run [OPTIONS] <jar-file> <arguments>
- args.add(FlinkConstants.FLINK_COMMAND);
- args.add(FlinkConstants.FLINK_RUN);
- args.addAll(populateFlinkOptions());
- } else {
- // execute sql-client.sh -f <script file>
- args.add(FlinkConstants.FLINK_SQL_COMMAND);
- args.addAll(populateFlinkSqlOptions());
- }
- String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
logger.info("flink task command : {}", command);
return command;
}
- /**
- * build flink options
- *
- * @return argument list
- */
- private List<String> populateFlinkOptions() {
- List<String> args = new ArrayList<>();
-
- String deployMode =
StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ?
flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
-
- if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
- populateFlinkOnYarnOptions(args);
- }
-
- // -p
- int parallelism = flinkParameters.getParallelism();
- if (parallelism > 0) {
- args.add(FlinkConstants.FLINK_PARALLELISM);
- args.add(String.format("%d", parallelism));
- }
-
- /**
- * -sae
- *
- * If the job is submitted in attached mode, perform a best-effort
cluster shutdown when the CLI is terminated abruptly.
- * The task status will be synchronized with the cluster job status.
- */
- args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT);
-
- // -s -yqu -yat -yD -D
- String others = flinkParameters.getOthers();
- if (StringUtils.isNotEmpty(others)) {
- args.add(others);
- }
-
- // -c
- ProgramType programType = flinkParameters.getProgramType();
- String mainClass = flinkParameters.getMainClass();
- if (programType != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
- args.add(FlinkConstants.FLINK_MAIN_CLASS);
- args.add(flinkParameters.getMainClass());
- }
-
- ResourceInfo mainJar = flinkParameters.getMainJar();
- if (mainJar != null) {
- // -py
- if(ProgramType.PYTHON == programType) {
- args.add(FlinkConstants.FLINK_PYTHON);
- }
- args.add(mainJar.getRes());
- }
-
- String mainArgs = flinkParameters.getMainArgs();
- if (StringUtils.isNotEmpty(mainArgs)) {
- // combining local and global parameters
- Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
- args.add(ParameterUtils.convertParameterPlaceholders(mainArgs,
ParamUtils.convert(paramsMap)));
- }
-
- return args;
- }
-
- private void populateFlinkOnYarnOptions(List<String> args) {
- // -m yarn-cluster
- args.add(FlinkConstants.FLINK_RUN_MODE);
- args.add(FlinkConstants.FLINK_YARN_CLUSTER);
-
- // -ys
- int slot = flinkParameters.getSlot();
- if (slot > 0) {
- args.add(FlinkConstants.FLINK_YARN_SLOT);
- args.add(String.format("%d", slot));
- }
-
- // -ynm
- String appName = flinkParameters.getAppName();
- if (StringUtils.isNotEmpty(appName)) {
- args.add(FlinkConstants.FLINK_APP_NAME);
- args.add(ArgsUtils.escape(appName));
- }
-
- /**
- * -yn
- *
- * Note: judge flink version, the parameter -yn has removed from flink
1.10
- */
- String flinkVersion = flinkParameters.getFlinkVersion();
- if (flinkVersion == null ||
FlinkConstants.FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
- int taskManager = flinkParameters.getTaskManager();
- if (taskManager > 0) {
- args.add(FlinkConstants.FLINK_TASK_MANAGE);
- args.add(String.format("%d", taskManager));
- }
- }
-
- // -yjm
- String jobManagerMemory = flinkParameters.getJobManagerMemory();
- if (StringUtils.isNotEmpty(jobManagerMemory)) {
- args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
- args.add(jobManagerMemory);
- }
-
- // -ytm
- String taskManagerMemory = flinkParameters.getTaskManagerMemory();
- if (StringUtils.isNotEmpty(taskManagerMemory)) {
- args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
- args.add(taskManagerMemory);
- }
-
- // -yqu
- String others = flinkParameters.getOthers();
- if (StringUtils.isEmpty(others) ||
!others.contains(FlinkConstants.FLINK_QUEUE)) {
- String queue = flinkParameters.getQueue();
- if (StringUtils.isNotEmpty(queue)) {
- args.add(FlinkConstants.FLINK_QUEUE);
- args.add(queue);
- }
- }
- }
-
- /**
- * build flink sql options
- *
- * @return argument list
- */
- private List<String> populateFlinkSqlOptions() {
- List<String> args = new ArrayList<>();
- List<String> defalutOptions = new ArrayList<>();
-
- String deployMode =
StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ?
flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
-
- /**
- * Currently flink sql on yarn only supports yarn-per-job mode
- */
- if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
- populateFlinkSqlOnYarnOptions(defalutOptions);
- } else {
- // execution.target
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET,
FlinkConstants.EXECUTION_TARGET_LOACL));
- }
-
- // parallelism.default
- int parallelism = flinkParameters.getParallelism();
- if (parallelism > 0) {
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT,
parallelism));
- }
-
- // -i
- args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
- args.add(generateInitScriptFile(StringUtils.join(defalutOptions,
FlinkConstants.FLINK_SQL_NEWLINE).concat(FlinkConstants.FLINK_SQL_NEWLINE)));
-
- // -f
- args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
- args.add(generateScriptFile());
-
- String others = flinkParameters.getOthers();
- if (StringUtils.isNotEmpty(others)) {
- args.add(others);
- }
- return args;
- }
-
- private void populateFlinkSqlOnYarnOptions(List<String> defalutOptions) {
- // execution.target
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET,
FlinkConstants.EXECUTION_TARGET_YARN_PER_JOB));
-
- // taskmanager.numberOfTaskSlots
- int slot = flinkParameters.getSlot();
- if (slot > 0) {
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS,
slot));
- }
-
- // yarn.application.name
- String appName = flinkParameters.getAppName();
- if (StringUtils.isNotEmpty(appName)) {
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME,
ArgsUtils.escape(appName)));
- }
-
- // jobmanager.memory.process.size
- String jobManagerMemory = flinkParameters.getJobManagerMemory();
- if (StringUtils.isNotEmpty(jobManagerMemory)) {
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE,
jobManagerMemory));
- }
-
- // taskmanager.memory.process.size
- String taskManagerMemory = flinkParameters.getTaskManagerMemory();
- if (StringUtils.isNotEmpty(taskManagerMemory)) {
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE,
taskManagerMemory));
- }
-
- // yarn.application.queue
- String others = flinkParameters.getOthers();
- if (StringUtils.isEmpty(others) ||
!others.contains(FlinkConstants.FLINK_QUEUE)) {
- String queue = flinkParameters.getQueue();
- if (StringUtils.isNotEmpty(queue)) {
-
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
queue));
- }
- }
- }
-
- private String generateInitScriptFile(String parameters) {
- String initScriptFileName = String.format("%s/%s_init.sql",
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
-
- File file = new File(initScriptFileName);
- Path path = file.toPath();
-
- if (!Files.exists(path)) {
- Set<PosixFilePermission> perms =
PosixFilePermissions.fromString(RWXR_XR_X);
- FileAttribute<Set<PosixFilePermission>> attr =
PosixFilePermissions.asFileAttribute(perms);
- try {
- if (SystemUtils.IS_OS_WINDOWS) {
- Files.createFile(path);
- } else {
- if (!file.getParentFile().exists()) {
- file.getParentFile().mkdirs();
- }
- Files.createFile(path, attr);
- }
-
- // Flink sql common parameters are written to the script file
- logger.info("common parameters : {}", parameters);
- Files.write(path, parameters.getBytes(),
StandardOpenOption.APPEND);
-
- // Flink init script is written to the script file
- if (StringUtils.isNotEmpty(flinkParameters.getInitScript())) {
- String script =
flinkParameters.getInitScript().replaceAll("\\r\\n", "\n");
- flinkParameters.setInitScript(script);
- logger.info("init script : {}",
flinkParameters.getInitScript());
- Files.write(path,
flinkParameters.getInitScript().getBytes(), StandardOpenOption.APPEND);
- }
- } catch (IOException e) {
- throw new RuntimeException("generate flink sql script error",
e);
- }
- }
- return initScriptFileName;
- }
-
- private String generateScriptFile() {
- String scriptFileName = String.format("%s/%s_node.sql",
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
-
- File file = new File(scriptFileName);
- Path path = file.toPath();
-
- if (!Files.exists(path)) {
- String script =
flinkParameters.getRawScript().replaceAll("\\r\\n", "\n");
- flinkParameters.setRawScript(script);
-
- logger.info("raw script : {}", flinkParameters.getRawScript());
- logger.info("task execute path : {}",
taskExecutionContext.getExecutePath());
-
- Set<PosixFilePermission> perms =
PosixFilePermissions.fromString(RWXR_XR_X);
- FileAttribute<Set<PosixFilePermission>> attr =
PosixFilePermissions.asFileAttribute(perms);
- try {
- if (SystemUtils.IS_OS_WINDOWS) {
- Files.createFile(path);
- } else {
- if (!file.getParentFile().exists()) {
- file.getParentFile().mkdirs();
- }
- Files.createFile(path, attr);
- }
- // Flink sql raw script is written to the script file
- Files.write(path, flinkParameters.getRawScript().getBytes(),
StandardOpenOption.APPEND);
- } catch (IOException e) {
- throw new RuntimeException("generate flink sql script error",
e);
- }
- }
- return scriptFileName;
- }
-
@Override
protected void setMainJarName() {
ResourceInfo mainJar = flinkParameters.getMainJar();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
new file mode 100644
index 0000000000..b87960fe96
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class FlinkArgsUtilsTest {
+
+ private String joinStringListWithSpace(List<String> stringList) {
+ return String.join(" ", stringList);
+ }
+
+ private FlinkParameters
buildTestFlinkParametersWithDeployMode(FlinkDeployMode flinkDeployMode) {
+ FlinkParameters flinkParameters = new FlinkParameters();
+ flinkParameters.setProgramType(ProgramType.SCALA);
+ flinkParameters.setDeployMode(flinkDeployMode);
+ flinkParameters.setParallelism(4);
+ ResourceInfo resourceInfo = new ResourceInfo();
+ resourceInfo.setId(1);
+ resourceInfo.setResourceName("job");
+ resourceInfo.setRes("/opt/job.jar");
+ flinkParameters.setMainJar(resourceInfo);
+ flinkParameters.setMainClass("org.example.Main");
+ flinkParameters.setSlot(4);
+ flinkParameters.setAppName("demo-app-name");
+ flinkParameters.setJobManagerMemory("1024m");
+ flinkParameters.setTaskManagerMemory("1024m");
+
+ return flinkParameters;
+ }
+ private TaskExecutionContext buildTestTaskExecutionContext() {
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setTaskAppId("app-id");
+ taskExecutionContext.setExecutePath("/tmp/execution");
+ return taskExecutionContext;
+ }
+
+ @Test
+ public void testRunJarInApplicationMode() throws Exception {
+ FlinkParameters flinkParameters =
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
+ List<String> commandLine =
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
+
+ Assert.assertEquals(
+ "flink run-application -t yarn-application -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ joinStringListWithSpace(commandLine));
+ }
+
+ @Test
+ public void testRunJarInClusterMode() throws Exception {
+ FlinkParameters flinkParameters =
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
+ flinkParameters.setFlinkVersion("1.11");
+ List<String> commandLine1 =
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
+
+ Assert.assertEquals(
+ "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ joinStringListWithSpace(commandLine1));
+
+ flinkParameters.setFlinkVersion("<1.10");
+ List<String> commandLine2 =
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
+
+ Assert.assertEquals(
+ "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ joinStringListWithSpace(commandLine2));
+
+ flinkParameters.setFlinkVersion(">=1.12");
+ List<String> commandLine3 =
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
+
+ Assert.assertEquals(
+ "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ joinStringListWithSpace(commandLine3));
+ }
+
+ @Test
+ public void testRunJarInLocalMode() throws Exception {
+ FlinkParameters flinkParameters =
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
+ List<String> commandLine =
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
+
+ Assert.assertEquals(
+ "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
+ joinStringListWithSpace(commandLine));
+ }
+
+ @Test
+ public void testRunSql() throws Exception {
+ FlinkParameters flinkParameters =
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
+ flinkParameters.setProgramType(ProgramType.SQL);
+ List<String> commandLine =
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
+
+ Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql
-f /tmp/execution/app-id_node.sql",
+ joinStringListWithSpace(commandLine));
+ }
+
+ @Test
+ public void testInitOptionsInClusterMode() throws Exception {
+ List<String> initOptions =
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
+ Assert.assertEquals(2, initOptions.size());
+ Assert.assertTrue(initOptions.contains("set execution.target=local"));
+ Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+ }
+
+ @Test
+ public void testInitOptionsInApplicationMode() throws Exception {
+ List<String> initOptions =
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
+ Assert.assertEquals(6, initOptions.size());
+ Assert.assertTrue(initOptions.contains("set
execution.target=yarn-per-job"));
+ Assert.assertTrue(initOptions.contains("set
taskmanager.numberOfTaskSlots=4"));
+ Assert.assertTrue(initOptions.contains("set
yarn.application.name=demo-app-name"));
+ Assert.assertTrue(initOptions.contains("set
jobmanager.memory.process.size=1024m"));
+ Assert.assertTrue(initOptions.contains("set
taskmanager.memory.process.size=1024m"));
+ Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
deleted file mode 100644
index fbb2d06951..0000000000
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.flink;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({
- JSONUtils.class
-})
-@PowerMockIgnore({"javax.*"})
-public class FlinkTaskTest {
-
- @Test
- public void testBuildCommand() {
- String parameters = buildFlinkParameters();
- TaskExecutionContext taskExecutionContext =
PowerMockito.mock(TaskExecutionContext.class);
- when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
- when(taskExecutionContext.getQueue()).thenReturn("default");
- FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
- flinkTask.init();
- Assert.assertEquals(
- "flink run " +
- "-m yarn-cluster " +
- "-ys 1 " +
- "-ynm TopSpeedWindowing " +
- "-yjm 1G " +
- "-ytm 1G " +
- "-yqu default " +
- "-p 2 -sae " +
- "-c
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing " +
- "TopSpeedWindowing.jar", flinkTask.buildCommand());
- }
-
- @Test
- public void testBuildCommandWithFlinkSql() {
- String parameters = buildFlinkParametersWithFlinkSql();
- TaskExecutionContext taskExecutionContext =
PowerMockito.mock(TaskExecutionContext.class);
- when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
- when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
- when(taskExecutionContext.getTaskAppId()).thenReturn("4483");
- FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
- flinkTask.init();
- Assert.assertEquals("sql-client.sh -i /tmp/4483_init.sql -f
/tmp/4483_node.sql", flinkTask.buildCommand());
- }
-
- private String buildFlinkParameters() {
- ResourceInfo resource = new ResourceInfo();
- resource.setId(2);
- resource.setResourceName("/TopSpeedWindowing.jar");
- resource.setRes("TopSpeedWindowing.jar");
-
- FlinkParameters parameters = new FlinkParameters();
- parameters.setLocalParams(Collections.emptyList());
- parameters.setResourceList(Collections.emptyList());
- parameters.setProgramType(ProgramType.JAVA);
-
parameters.setMainClass("org.apache.flink.streaming.examples.windowing.TopSpeedWindowing");
- parameters.setMainJar(resource);
- parameters.setDeployMode("cluster");
- parameters.setAppName("TopSpeedWindowing");
- parameters.setFlinkVersion(">=1.10");
- parameters.setJobManagerMemory("1G");
- parameters.setTaskManagerMemory("1G");
- parameters.setSlot(1);
- parameters.setTaskManager(2);
- parameters.setParallelism(2);
- return JSONUtils.toJsonString(parameters);
- }
-
- private String buildFlinkParametersWithFlinkSql() {
- FlinkParameters parameters = new FlinkParameters();
- parameters.setLocalParams(Collections.emptyList());
- parameters.setInitScript("set
sql-client.execution.result-mode=tableau;");
- parameters.setRawScript("selcet 11111;");
- parameters.setProgramType(ProgramType.SQL);
- parameters.setMainClass("");
- parameters.setDeployMode("cluster");
- parameters.setAppName("FlinkSQL");
- parameters.setOthers("");
- parameters.setJobManagerMemory("1G");
- parameters.setTaskManagerMemory("1G");
- parameters.setParallelism(1);
- parameters.setFlinkVersion(">=1.10");
- return JSONUtils.toJsonString(parameters);
- }
-}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
index 568d126ea2..7b61477bc5 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { computed, ref, watchEffect } from 'vue'
+import { computed, watch, watchEffect } from 'vue'
import { useI18n } from 'vue-i18n'
-import { useCustomParams, useDeployMode, useMainJar, useResources } from '.'
+import { useCustomParams, useMainJar, useResources } from '.'
import type { IJsonItem } from '../types'
export function useFlink(model: { [field: string]: any }): IJsonItem[] {
@@ -36,14 +36,53 @@ export function useFlink(model: { [field: string]: any }):
IJsonItem[] {
)
const taskManagerNumberSpan = computed(() =>
- model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0
+ model.flinkVersion === '<1.10' && model.deployMode !== 'local' ? 12 : 0
)
const deployModeSpan = computed(() =>
- model.deployMode === 'cluster' ? 12 : 0
+ model.deployMode !== 'local' ? 12 : 0
)
- const appNameSpan = computed(() => (model.deployMode === 'cluster' ? 24 : 0))
+ const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0))
+
+ const deployModeOptions = computed(() => {
+ if (model.flinkVersion === '<1.10') {
+ return [
+ {
+ label: 'cluster',
+ value: 'cluster'
+ },
+ {
+ label: 'local',
+ value: 'local'
+ }
+ ];
+ } else {
+ return [
+ {
+ label: 'per-job/cluster',
+ value: 'cluster'
+ },
+ {
+ label: 'application',
+ value: 'application'
+ },
+ {
+ label: 'local',
+ value: 'local'
+ }
+ ];
+ }
+ })
+
+ watch(
+ () => model.flinkVersion,
+ () => {
+ if (model.flinkVersion === '<1.10' && model.deployMode ===
'application') {
+ model.deployMode = 'cluster'
+ }
+ }
+ )
watchEffect(() => {
model.flinkVersion = model.programType === 'SQL' ? '>=1.13' : '<1.10'
@@ -86,7 +125,13 @@ export function useFlink(model: { [field: string]: any }):
IJsonItem[] {
}
},
useMainJar(model),
- useDeployMode(24, ref(false)),
+ {
+ type: 'radio',
+ field: 'deployMode',
+ name: t('project.node.deploy_mode'),
+ options: deployModeOptions,
+ span: 24
+ },
{
type: 'editor',
field: 'initScript',
@@ -269,7 +314,11 @@ const FLINK_VERSIONS = [
value: '<1.10'
},
{
- label: '>=1.10',
- value: '>=1.10'
+ label: '1.11',
+ value: '1.11'
+ },
+ {
+ label: '>=1.12',
+ value: '>=1.12'
}
]