This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a3c3e7797 [feature][task-flink] Support Flink application mode (#9577)
4a3c3e7797 is described below

commit 4a3c3e779781b432e43cc96ab6a729c060e084d7
Author: Paul Zhang <[email protected]>
AuthorDate: Fri Jul 1 12:20:38 2022 +0800

    [feature][task-flink] Support Flink application mode (#9577)
---
 docs/docs/en/guide/task/flink.md                   |  30 +-
 docs/docs/zh/guide/task/flink.md                   |   2 +-
 .../plugin/task/flink/FileUtils.java               | 103 +++++++
 .../plugin/task/flink/FlinkArgsUtils.java          | 271 ++++++++++++++++++
 .../plugin/task/flink/FlinkConstants.java          |  14 +-
 .../plugin/task/flink/FlinkDeployMode.java         |  32 +++
 .../plugin/task/flink/FlinkParameters.java         |   8 +-
 .../plugin/task/flink/FlinkTask.java               | 307 +--------------------
 .../plugin/task/flink/FlinkArgsUtilsTest.java      | 132 +++++++++
 .../plugin/task/flink/FlinkTaskTest.java           | 116 --------
 .../task/components/node/fields/use-flink.ts       |  65 ++++-
 11 files changed, 641 insertions(+), 439 deletions(-)

diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md
index 821e16695b..44c9461803 100644
--- a/docs/docs/en/guide/task/flink.md
+++ b/docs/docs/en/guide/task/flink.md
@@ -30,7 +30,7 @@ Flink task type, used to execute Flink programs. For Flink 
nodes:
 | Program type | Support Java, Scala, Python and SQL four languages. |
 | Class of main function**: The **full path** of Main Class, the entry point 
of the Flink program. |
 | Main jar package | The jar package of the Flink program (upload by Resource 
Center). |
-| Deployment mode | Support 2 deployment modes: cluster and local. |
+| Deployment mode | Support 3 deployment modes: cluster, local and application 
(Flink 1.11 and later. See also [Run an application in Application 
Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)).
 |
 | Initialization script | Script file to initialize session context. |
 | Script | The sql script file developed by the user that should be executed. |
 | Flink version | Select version according to the execution environment. |
@@ -45,6 +45,34 @@ Flink task type, used to execute Flink programs. For Flink 
nodes:
 | Resource | Appoint resource files in the `Resource` if parameters refer to 
them. |
 | Custom parameter | It is a local user-defined parameter for Flink, and will 
replace the content with `${variable}` in the script. |
 | Predecessor task | Selecting a predecessor task for the current task, will 
set the selected predecessor task as upstream of the current task. |
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does 
not need to execute, select the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, 
execute in the order of priority from high to low, and tasks with the same 
priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to 
execute. If `Default` is selected, randomly select a worker machine for 
execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to 
resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting 
the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in 
execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the 
task runs exceed the "timeout", an alarm email will send and the task execution 
will fail.
+- **Program type**: Support Java, Scala, Python and SQL four languages.
+- **The class of main function**: The **full path** of Main Class, the entry 
point of the Flink program.
+- **Main jar package**: The jar package of the Flink program (upload by 
Resource Center).
+- **Deployment mode**: Support 3 deployment modes: cluster, local and 
application (Flink 1.11 and later. See also [Run an application in Application 
Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)).
+- **Initialization script**: Script file to initialize session context.
+- **Script**: The sql script file developed by the user that should be 
executed.
+- **Flink version**: Select version according to the execution env.
+- **Task name** (optional): Flink task name.
+- **JobManager memory size**: Used to set the size of jobManager memories, 
which can be set according to the actual production environment.
+- **Number of slots**: Used to set the number of slots, which can be set 
according to the actual production environment.
+- **TaskManager memory size**: Used to set the size of taskManager memories, 
which can be set according to the actual production environment.
+- **Number of TaskManager**: Used to set the number of taskManagers, which can 
be set according to the actual production environment.
+- **Parallelism**: Used to set the degree of parallelism for executing Flink 
tasks.
+- **Main program parameters**: Set the input parameters for the Flink program 
and support the substitution of custom parameter variables.
+- **Optional parameters**: Support `--jar`, `--files`,` --archives`, `--conf` 
format.
+- **Resource**: Appoint resource files in the `Resource` if parameters refer 
to them.
+- **Custom parameter**: It is a local user-defined parameter for Flink, and 
will replace the content with `${variable}` in the script.
+- **Predecessor task**: Selecting a predecessor task for the current task, 
will set the selected predecessor task as upstream of the current task.
 
 ## Task Example
 
diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md
index 212f6ee208..2cfb9b4b6d 100644
--- a/docs/docs/zh/guide/task/flink.md
+++ b/docs/docs/zh/guide/task/flink.md
@@ -28,7 +28,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
 - 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。
 - 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。
 - 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。
-- 部署方式:支持 cluster 和 local 两种模式的部署。
+- 部署方式:支持 cluster、 local 和 application (Flink 1.11和之后的版本支持,参见 [Run an 
application in Application 
Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode))
 三种模式的部署。
 - 初始化脚本:用于初始化会话上下文的脚本文件。
 - 脚本:用户开发的应该执行的 SQL 脚本文件。
 - Flink 版本:根据所需环境选择对应的版本即可。
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
new file mode 100644
index 0000000000..33f0fecbfb
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
+public class FileUtils {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileUtils.class);
+    private FileUtils() {}
+
+    public static String getInitScriptFilePath(TaskExecutionContext 
taskExecutionContext) {
+        return String.format("%s/%s_init.sql", 
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+    }
+
+    public static String getScriptFilePath(TaskExecutionContext 
taskExecutionContext) {
+        return String.format("%s/%s_node.sql", 
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+    }
+
+    public static void generateScriptFile(TaskExecutionContext 
taskExecutionContext, FlinkParameters flinkParameters) {
+        String initScriptFilePath = 
FileUtils.getInitScriptFilePath(taskExecutionContext);
+        String scriptFilePath = 
FileUtils.getScriptFilePath(taskExecutionContext);
+        String initOptionsString = StringUtils.join(
+                FlinkArgsUtils.buildInitOptionsForSql(flinkParameters),
+                FlinkConstants.FLINK_SQL_NEWLINE
+            ).concat(FlinkConstants.FLINK_SQL_NEWLINE);
+        writeScriptFile(initScriptFilePath, initOptionsString + 
flinkParameters.getInitScript());
+        writeScriptFile(scriptFilePath, flinkParameters.getRawScript());
+    }
+
+    private static void writeScriptFile(String scriptFileFullPath, String 
script) {
+        File scriptFile = new File(scriptFileFullPath);
+        Path path = scriptFile.toPath();
+        if (Files.exists(path)) {
+            try {
+                Files.delete(path);
+            } catch (IOException e) {
+                throw new RuntimeException(String.format("Flink Script file 
exists in path: %s before creation and cannot be deleted", path), e);
+            }
+        }
+
+        Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString(RWXR_XR_X);
+        FileAttribute<Set<PosixFilePermission>> attr = 
PosixFilePermissions.asFileAttribute(perms);
+        try {
+            if (SystemUtils.IS_OS_WINDOWS) {
+                Files.createFile(path);
+            } else {
+                if (!scriptFile.getParentFile().exists()) {
+                    scriptFile.getParentFile().mkdirs();
+                }
+                Files.createFile(path, attr);
+            }
+
+            if (StringUtils.isNotEmpty(script)) {
+                String replacedScript = script.replaceAll("\\r\\n", "\n");
+                FileUtils.writeStringToFile(scriptFile, replacedScript, 
StandardOpenOption.APPEND);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Generate flink SQL script error", e);
+        }
+    }
+
+    private static void writeStringToFile(File file, String content, 
StandardOpenOption standardOpenOption) {
+        try {
+            LOGGER.info("Writing content: " + content);
+            LOGGER.info("To file: " + file.getAbsolutePath());
+            Files.write(file.getAbsoluteFile().toPath(), 
content.getBytes(StandardCharsets.UTF_8), standardOpenOption);
+        } catch(IOException e) {
+            throw new RuntimeException("Error writing file: " + 
file.getAbsoluteFile(), e);
+        }
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
new file mode 100644
index 0000000000..53f94ea0d1
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * flink args utils
+ */
+public class FlinkArgsUtils {
+
+    private FlinkArgsUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String LOCAL_DEPLOY_MODE = "local";
+    private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
+    private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_12 = ">=1.12";
+    private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_13 = ">=1.13";
+    /**
+     *  default flink deploy mode
+     */
+    public static final FlinkDeployMode DEFAULT_DEPLOY_MODE = 
FlinkDeployMode.CLUSTER;
+
+    /**
+     * build flink command line
+     *
+     * @param param flink parameters
+     * @return argument list
+     */
+    public static List<String> buildCommandLine(TaskExecutionContext 
taskExecutionContext, FlinkParameters param) {
+        switch (param.getProgramType()) {
+            case SQL:
+                return buildCommandLineForSql(taskExecutionContext, param);
+            default:
+                return buildCommandLineForOthers(taskExecutionContext, param);
+        }
+    }
+
+    /**
+     * build flink command line for SQL
+     *
+     * @return argument list
+     */
+    private static List<String> buildCommandLineForSql(TaskExecutionContext 
taskExecutionContext, FlinkParameters flinkParameters) {
+        List<String> args = new ArrayList<>();
+
+        args.add(FlinkConstants.FLINK_SQL_COMMAND);
+
+        // -i
+        String initScriptFilePath = 
FileUtils.getInitScriptFilePath(taskExecutionContext);
+        args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
+        args.add(initScriptFilePath);
+
+        // -f
+        String scriptFilePath = 
FileUtils.getScriptFilePath(taskExecutionContext);
+        args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
+        args.add(scriptFilePath);
+
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isNotEmpty(others)) {
+            args.add(others);
+        }
+        return args;
+    }
+
+    public static List<String> buildInitOptionsForSql(FlinkParameters 
flinkParameters) {
+        List<String> initOptions = new ArrayList<>();
+
+        FlinkDeployMode deployMode = 
Optional.ofNullable(flinkParameters.getDeployMode()).orElse(FlinkDeployMode.CLUSTER);
+
+        /**
+         * Currently flink sql on yarn only supports yarn-per-job mode
+         */
+        if (FlinkDeployMode.CLUSTER == deployMode) {
+            // execution.target
+            
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
"local"));
+        } else {
+            // execution.target
+            
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
FlinkConstants.FLINK_YARN_PER_JOB));
+
+            // taskmanager.numberOfTaskSlots
+            int slot = flinkParameters.getSlot();
+            if (slot > 0) {
+                
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS,
 slot));
+            }
+
+            // yarn.application.name
+            String appName = flinkParameters.getAppName();
+            if (StringUtils.isNotEmpty(appName)) {
+                
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME,
 ArgsUtils.escape(appName)));
+            }
+
+            // jobmanager.memory.process.size
+            String jobManagerMemory = flinkParameters.getJobManagerMemory();
+            if (StringUtils.isNotEmpty(jobManagerMemory)) {
+                
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE,
 jobManagerMemory));
+            }
+
+            // taskmanager.memory.process.size
+            String taskManagerMemory = flinkParameters.getTaskManagerMemory();
+            if (StringUtils.isNotEmpty(taskManagerMemory)) {
+                
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE,
 taskManagerMemory));
+            }
+
+            // yarn.application.queue
+            String others = flinkParameters.getOthers();
+            if (StringUtils.isEmpty(others) || 
!others.contains(FlinkConstants.FLINK_QUEUE)) {
+                String queue = flinkParameters.getQueue();
+                if (StringUtils.isNotEmpty(queue)) {
+                    
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
 queue));
+                }
+            }
+        }
+
+        // parallelism.default
+        int parallelism = flinkParameters.getParallelism();
+        if (parallelism > 0) {
+            
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT, 
parallelism));
+        }
+
+        return initOptions;
+    }
+
+    private static List<String> buildCommandLineForOthers(TaskExecutionContext 
taskExecutionContext, FlinkParameters flinkParameters) {
+        List<String> args = new ArrayList<>();
+
+        args.add(FlinkConstants.FLINK_COMMAND);
+        FlinkDeployMode deployMode = 
Optional.ofNullable(flinkParameters.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE);
+        String flinkVersion = flinkParameters.getFlinkVersion();
+        // build run command
+        switch (deployMode) {
+            case CLUSTER:
+                if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) || 
FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
+                    args.add(FlinkConstants.FLINK_RUN);  //run
+                    args.add(FlinkConstants.FLINK_EXECUTION_TARGET);  //-t
+                    args.add(FlinkConstants.FLINK_YARN_PER_JOB);  
//yarn-per-job
+                } else {
+                    args.add(FlinkConstants.FLINK_RUN);  //run
+                    args.add(FlinkConstants.FLINK_RUN_MODE);  //-m
+                    args.add(FlinkConstants.FLINK_YARN_CLUSTER);  
//yarn-cluster
+                }
+                break;
+            case APPLICATION:
+                args.add(FlinkConstants.FLINK_RUN_APPLICATION);  
//run-application
+                args.add(FlinkConstants.FLINK_EXECUTION_TARGET);  //-t
+                args.add(FlinkConstants.FLINK_YARN_APPLICATION);  
//yarn-application
+                break;
+            case LOCAL:
+                args.add(FlinkConstants.FLINK_RUN);  //run
+                break;
+        }
+
+        String others = flinkParameters.getOthers();
+
+        // build args
+        switch (deployMode) {
+            case CLUSTER:
+            case APPLICATION:
+                int slot = flinkParameters.getSlot();
+                if (slot > 0) {
+                    args.add(FlinkConstants.FLINK_YARN_SLOT);
+                    args.add(String.format("%d", slot));   //-ys
+                }
+
+                String appName = flinkParameters.getAppName();
+                if (StringUtils.isNotEmpty(appName)) { //-ynm
+                    args.add(FlinkConstants.FLINK_APP_NAME);
+                    args.add(ArgsUtils.escape(appName));
+                }
+
+                // judge flink version, the parameter -yn has removed from 
flink 1.10
+                if (flinkVersion == null || 
FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
+                    int taskManager = flinkParameters.getTaskManager();
+                    if (taskManager > 0) {                        //-yn
+                        args.add(FlinkConstants.FLINK_TASK_MANAGE);
+                        args.add(String.format("%d", taskManager));
+                    }
+                }
+                String jobManagerMemory = 
flinkParameters.getJobManagerMemory();
+                if (StringUtils.isNotEmpty(jobManagerMemory)) {
+                    args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
+                    args.add(jobManagerMemory); //-yjm
+                }
+
+                String taskManagerMemory = 
flinkParameters.getTaskManagerMemory();
+                if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
+                    args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
+                    args.add(taskManagerMemory);
+                }
+
+                if (StringUtils.isEmpty(others) || 
!others.contains(FlinkConstants.FLINK_QUEUE)) {
+                    String queue = flinkParameters.getQueue();
+                    if (StringUtils.isNotEmpty(queue)) { // -yqu
+                        args.add(FlinkConstants.FLINK_QUEUE);
+                        args.add(queue);
+                    }
+                }
+                break;
+            case LOCAL:
+                break;
+        }
+
+        int parallelism = flinkParameters.getParallelism();
+        if (parallelism > 0) {
+            args.add(FlinkConstants.FLINK_PARALLELISM);
+            args.add(String.format("%d", parallelism));   // -p
+        }
+
+        // If the job is submitted in attached mode, perform a best-effort 
cluster shutdown when the CLI is terminated abruptly
+        // The task status will be synchronized with the cluster job status
+        args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
+
+        // -s -yqu -yat -yD -D
+        if (StringUtils.isNotEmpty(others)) {
+            args.add(others);
+        }
+
+        ProgramType programType = flinkParameters.getProgramType();
+        String mainClass = flinkParameters.getMainClass();
+        if (programType != null && programType != ProgramType.PYTHON && 
StringUtils.isNotEmpty(mainClass)) {
+            args.add(FlinkConstants.FLINK_MAIN_CLASS);    //-c
+            args.add(flinkParameters.getMainClass());          //main class
+        }
+
+        ResourceInfo mainJar = flinkParameters.getMainJar();
+        if (mainJar != null) {
+            // -py
+            if(ProgramType.PYTHON == programType) {
+                args.add(FlinkConstants.FLINK_PYTHON);
+            }
+            args.add(mainJar.getRes());
+        }
+
+        String mainArgs = flinkParameters.getMainArgs();
+        if (StringUtils.isNotEmpty(mainArgs)) {
+            Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+            args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, 
ParamUtils.convert(paramsMap)));
+        }
+
+        return args;
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index 42cb5ad78c..91e4efc125 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -39,8 +39,12 @@ public class FlinkConstants {
     /**
      * flink run options
      */
+    public static final String FLINK_RUN_APPLICATION = "run-application";
     public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
+    public static final String FLINK_YARN_APPLICATION = "yarn-application";
+    public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
     public static final String FLINK_RUN_MODE = "-m";
+    public static final String FLINK_EXECUTION_TARGET = "-t";
     public static final String FLINK_YARN_SLOT = "-ys";
     public static final String FLINK_APP_NAME = "-ynm";
     public static final String FLINK_QUEUE = "-yqu";
@@ -51,7 +55,7 @@ public class FlinkConstants {
     public static final String FLINK_PARALLELISM = "-p";
     public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
     public static final String FLINK_PYTHON = "-py";
-
+    // For Flink SQL
     public static final String FLINK_FORMAT_EXECUTION_TARGET = "set 
execution.target=%s";
     public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set 
yarn.application.name=%s";
     public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set 
yarn.application.queue=%s";
@@ -62,12 +66,4 @@ public class FlinkConstants {
     public static final String FLINK_SQL_SCRIPT_FILE = "-f";
     public static final String FLINK_SQL_INIT_FILE = "-i";
     public static final String FLINK_SQL_NEWLINE = ";\n";
-
-    // execution.target options
-    public static final String EXECUTION_TARGET_YARN_PER_JOB = "yarn-per-job";
-    public static final String EXECUTION_TARGET_LOACL = "local";
-
-    public static final String DEPLOY_MODE_CLUSTER = "cluster";
-    public static final String DEPLOY_MODE_LOCAL = "local";
-    public static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
new file mode 100644
index 0000000000..b02cd40f92
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Flink deploy mode
+ */
+public enum FlinkDeployMode {
+    @JsonProperty("local")
+    LOCAL,
+    @JsonProperty("cluster")
+    CLUSTER,
+    @JsonProperty("application")
+    APPLICATION
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
index dca6fb58a3..8b45641989 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
@@ -39,9 +39,9 @@ public class FlinkParameters extends AbstractParameters {
     private String mainClass;
 
     /**
-     * deploy mode  yarn-cluster yarn-local
+     * deploy mode  yarn-cluster yarn-local yarn-application
      */
-    private String deployMode;
+    private FlinkDeployMode deployMode;
 
     /**
      * arguments
@@ -130,11 +130,11 @@ public class FlinkParameters extends AbstractParameters {
         this.mainClass = mainClass;
     }
 
-    public String getDeployMode() {
+    public FlinkDeployMode getDeployMode() {
         return deployMode;
     }
 
-    public void setDeployMode(String deployMode) {
+    public void setDeployMode(FlinkDeployMode deployMode) {
         this.deployMode = deployMode;
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index f04282a144..5a78eed1bd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.flink;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -26,26 +24,14 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.lang3.SystemUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class FlinkTask extends AbstractYarnTask {
 
@@ -66,7 +52,6 @@ public class FlinkTask extends AbstractYarnTask {
 
     @Override
     public void init() {
-
         logger.info("flink task params {}", 
taskExecutionContext.getTaskParams());
 
         flinkParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
FlinkParameters.class);
@@ -75,10 +60,9 @@ public class FlinkTask extends AbstractYarnTask {
             throw new RuntimeException("flink task params is not valid");
         }
         flinkParameters.setQueue(taskExecutionContext.getQueue());
+        setMainJarName();
 
-        if (ProgramType.SQL != flinkParameters.getProgramType()) {
-            setMainJarName();
-        }
+        FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
     }
 
     /**
@@ -88,293 +72,16 @@ public class FlinkTask extends AbstractYarnTask {
      */
     @Override
     protected String buildCommand() {
-        List<String> args = new ArrayList<>();
+        // flink run/run-application [OPTIONS] <jar-file> <arguments>
+        List<String> args = 
FlinkArgsUtils.buildCommandLine(taskExecutionContext, flinkParameters);
+
+        String command = ParameterUtils
+                .convertParameterPlaceholders(String.join(" ", args), 
taskExecutionContext.getDefinedParams());
 
-        if (ProgramType.SQL != flinkParameters.getProgramType()) {
-            // execute flink run [OPTIONS] <jar-file> <arguments>
-            args.add(FlinkConstants.FLINK_COMMAND);
-            args.add(FlinkConstants.FLINK_RUN);
-            args.addAll(populateFlinkOptions());
-        } else {
-            // execute sql-client.sh -f <script file>
-            args.add(FlinkConstants.FLINK_SQL_COMMAND);
-            args.addAll(populateFlinkSqlOptions());
-        }
-        String command = 
ParameterUtils.convertParameterPlaceholders(String.join(" ", args), 
taskExecutionContext.getDefinedParams());
         logger.info("flink task command : {}", command);
         return command;
     }
 
-    /**
-     * build flink options
-     *
-     * @return argument list
-     */
-    private List<String> populateFlinkOptions() {
-        List<String> args = new ArrayList<>();
-
-        String deployMode = 
StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? 
flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
-
-        if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
-            populateFlinkOnYarnOptions(args);
-        }
-
-        // -p
-        int parallelism = flinkParameters.getParallelism();
-        if (parallelism > 0) {
-            args.add(FlinkConstants.FLINK_PARALLELISM);
-            args.add(String.format("%d", parallelism));
-        }
-
-        /**
-         * -sae
-         *
-         * If the job is submitted in attached mode, perform a best-effort 
cluster shutdown when the CLI is terminated abruptly.
-         * The task status will be synchronized with the cluster job status.
-         */
-        args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT);
-
-        // -s -yqu -yat -yD -D
-        String others = flinkParameters.getOthers();
-        if (StringUtils.isNotEmpty(others)) {
-            args.add(others);
-        }
-
-        // -c
-        ProgramType programType = flinkParameters.getProgramType();
-        String mainClass = flinkParameters.getMainClass();
-        if (programType != ProgramType.PYTHON && 
StringUtils.isNotEmpty(mainClass)) {
-            args.add(FlinkConstants.FLINK_MAIN_CLASS);
-            args.add(flinkParameters.getMainClass());
-        }
-
-        ResourceInfo mainJar = flinkParameters.getMainJar();
-        if (mainJar != null) {
-            // -py
-            if(ProgramType.PYTHON == programType) {
-                args.add(FlinkConstants.FLINK_PYTHON);
-            }
-            args.add(mainJar.getRes());
-        }
-
-        String mainArgs = flinkParameters.getMainArgs();
-        if (StringUtils.isNotEmpty(mainArgs)) {
-            // combining local and global parameters
-            Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
-            args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, 
ParamUtils.convert(paramsMap)));
-        }
-
-        return args;
-    }
-
-    private void populateFlinkOnYarnOptions(List<String> args) {
-        // -m yarn-cluster
-        args.add(FlinkConstants.FLINK_RUN_MODE);
-        args.add(FlinkConstants.FLINK_YARN_CLUSTER);
-
-        // -ys
-        int slot = flinkParameters.getSlot();
-        if (slot > 0) {
-            args.add(FlinkConstants.FLINK_YARN_SLOT);
-            args.add(String.format("%d", slot));
-        }
-
-        // -ynm
-        String appName = flinkParameters.getAppName();
-        if (StringUtils.isNotEmpty(appName)) {
-            args.add(FlinkConstants.FLINK_APP_NAME);
-            args.add(ArgsUtils.escape(appName));
-        }
-
-        /**
-         * -yn
-         *
-         * Note: judge flink version, the parameter -yn has removed from flink 
1.10
-         */
-        String flinkVersion = flinkParameters.getFlinkVersion();
-        if (flinkVersion == null || 
FlinkConstants.FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
-            int taskManager = flinkParameters.getTaskManager();
-            if (taskManager > 0) {
-                args.add(FlinkConstants.FLINK_TASK_MANAGE);
-                args.add(String.format("%d", taskManager));
-            }
-        }
-
-        // -yjm
-        String jobManagerMemory = flinkParameters.getJobManagerMemory();
-        if (StringUtils.isNotEmpty(jobManagerMemory)) {
-            args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
-            args.add(jobManagerMemory);
-        }
-
-        // -ytm
-        String taskManagerMemory = flinkParameters.getTaskManagerMemory();
-        if (StringUtils.isNotEmpty(taskManagerMemory)) {
-            args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
-            args.add(taskManagerMemory);
-        }
-
-        // -yqu
-        String others = flinkParameters.getOthers();
-        if (StringUtils.isEmpty(others) || 
!others.contains(FlinkConstants.FLINK_QUEUE)) {
-            String queue = flinkParameters.getQueue();
-            if (StringUtils.isNotEmpty(queue)) {
-                args.add(FlinkConstants.FLINK_QUEUE);
-                args.add(queue);
-            }
-        }
-    }
-
-    /**
-     * build flink sql options
-     *
-     * @return argument list
-     */
-    private List<String> populateFlinkSqlOptions() {
-        List<String> args = new ArrayList<>();
-        List<String> defalutOptions = new ArrayList<>();
-
-        String deployMode = 
StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? 
flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
-
-        /**
-         * Currently flink sql on yarn only supports yarn-per-job mode
-         */
-        if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
-            populateFlinkSqlOnYarnOptions(defalutOptions);
-        } else {
-            // execution.target
-            
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
FlinkConstants.EXECUTION_TARGET_LOACL));
-        }
-
-        // parallelism.default
-        int parallelism = flinkParameters.getParallelism();
-        if (parallelism > 0) {
-            
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT,
 parallelism));
-        }
-
-        // -i
-        args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
-        args.add(generateInitScriptFile(StringUtils.join(defalutOptions, 
FlinkConstants.FLINK_SQL_NEWLINE).concat(FlinkConstants.FLINK_SQL_NEWLINE)));
-
-        // -f
-        args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
-        args.add(generateScriptFile());
-
-        String others = flinkParameters.getOthers();
-        if (StringUtils.isNotEmpty(others)) {
-            args.add(others);
-        }
-        return args;
-    }
-
-    private void populateFlinkSqlOnYarnOptions(List<String> defalutOptions) {
-        // execution.target
-        
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
FlinkConstants.EXECUTION_TARGET_YARN_PER_JOB));
-
-        // taskmanager.numberOfTaskSlots
-        int slot = flinkParameters.getSlot();
-        if (slot > 0) {
-            
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS,
 slot));
-        }
-
-        // yarn.application.name
-        String appName = flinkParameters.getAppName();
-        if (StringUtils.isNotEmpty(appName)) {
-            
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME,
 ArgsUtils.escape(appName)));
-        }
-
-        // jobmanager.memory.process.size
-        String jobManagerMemory = flinkParameters.getJobManagerMemory();
-        if (StringUtils.isNotEmpty(jobManagerMemory)) {
-            
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE,
 jobManagerMemory));
-        }
-
-        // taskmanager.memory.process.size
-        String taskManagerMemory = flinkParameters.getTaskManagerMemory();
-        if (StringUtils.isNotEmpty(taskManagerMemory)) {
-            
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE,
 taskManagerMemory));
-        }
-
-        // yarn.application.queue
-        String others = flinkParameters.getOthers();
-        if (StringUtils.isEmpty(others) || 
!others.contains(FlinkConstants.FLINK_QUEUE)) {
-            String queue = flinkParameters.getQueue();
-            if (StringUtils.isNotEmpty(queue)) {
-                
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
 queue));
-            }
-        }
-    }
-
-    private String generateInitScriptFile(String parameters) {
-        String initScriptFileName = String.format("%s/%s_init.sql", 
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
-
-        File file = new File(initScriptFileName);
-        Path path = file.toPath();
-
-        if (!Files.exists(path)) {
-            Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString(RWXR_XR_X);
-            FileAttribute<Set<PosixFilePermission>> attr = 
PosixFilePermissions.asFileAttribute(perms);
-            try {
-                if (SystemUtils.IS_OS_WINDOWS) {
-                    Files.createFile(path);
-                } else {
-                    if (!file.getParentFile().exists()) {
-                        file.getParentFile().mkdirs();
-                    }
-                    Files.createFile(path, attr);
-                }
-
-                // Flink sql common parameters are written to the script file
-                logger.info("common parameters : {}", parameters);
-                Files.write(path, parameters.getBytes(), 
StandardOpenOption.APPEND);
-
-                // Flink init script is written to the script file
-                if (StringUtils.isNotEmpty(flinkParameters.getInitScript())) {
-                    String script = 
flinkParameters.getInitScript().replaceAll("\\r\\n", "\n");
-                    flinkParameters.setInitScript(script);
-                    logger.info("init script : {}", 
flinkParameters.getInitScript());
-                    Files.write(path, 
flinkParameters.getInitScript().getBytes(), StandardOpenOption.APPEND);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("generate flink sql script error", 
e);
-            }
-        }
-        return initScriptFileName;
-    }
-
-    private String generateScriptFile() {
-        String scriptFileName = String.format("%s/%s_node.sql", 
taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
-
-        File file = new File(scriptFileName);
-        Path path = file.toPath();
-
-        if (!Files.exists(path)) {
-            String script = 
flinkParameters.getRawScript().replaceAll("\\r\\n", "\n");
-            flinkParameters.setRawScript(script);
-
-            logger.info("raw script : {}", flinkParameters.getRawScript());
-            logger.info("task execute path : {}", 
taskExecutionContext.getExecutePath());
-
-            Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString(RWXR_XR_X);
-            FileAttribute<Set<PosixFilePermission>> attr = 
PosixFilePermissions.asFileAttribute(perms);
-            try {
-                if (SystemUtils.IS_OS_WINDOWS) {
-                    Files.createFile(path);
-                } else {
-                    if (!file.getParentFile().exists()) {
-                        file.getParentFile().mkdirs();
-                    }
-                    Files.createFile(path, attr);
-                }
-                // Flink sql raw script is written to the script file
-                Files.write(path, flinkParameters.getRawScript().getBytes(), 
StandardOpenOption.APPEND);
-            } catch (IOException e) {
-                throw new RuntimeException("generate flink sql script error", 
e);
-            }
-        }
-        return scriptFileName;
-    }
-
     @Override
     protected void setMainJarName() {
         ResourceInfo mainJar = flinkParameters.getMainJar();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
new file mode 100644
index 0000000000..b87960fe96
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class FlinkArgsUtilsTest {
+
+    private String joinStringListWithSpace(List<String> stringList) {
+        return String.join(" ", stringList);
+    }
+
+    private FlinkParameters 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode flinkDeployMode) {
+        FlinkParameters flinkParameters = new FlinkParameters();
+        flinkParameters.setProgramType(ProgramType.SCALA);
+        flinkParameters.setDeployMode(flinkDeployMode);
+        flinkParameters.setParallelism(4);
+        ResourceInfo resourceInfo = new ResourceInfo();
+        resourceInfo.setId(1);
+        resourceInfo.setResourceName("job");
+        resourceInfo.setRes("/opt/job.jar");
+        flinkParameters.setMainJar(resourceInfo);
+        flinkParameters.setMainClass("org.example.Main");
+        flinkParameters.setSlot(4);
+        flinkParameters.setAppName("demo-app-name");
+        flinkParameters.setJobManagerMemory("1024m");
+        flinkParameters.setTaskManagerMemory("1024m");
+
+        return flinkParameters;
+    }
+    private TaskExecutionContext buildTestTaskExecutionContext() {
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskAppId("app-id");
+        taskExecutionContext.setExecutePath("/tmp/execution");
+        return taskExecutionContext;
+    }
+
+    @Test
+    public void testRunJarInApplicationMode() throws Exception {
+        FlinkParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
+        List<String> commandLine = 
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
+
+        Assert.assertEquals(
+                "flink run-application -t yarn-application -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                joinStringListWithSpace(commandLine));
+    }
+
+    @Test
+    public void testRunJarInClusterMode() throws Exception {
+        FlinkParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
+        flinkParameters.setFlinkVersion("1.11");
+        List<String> commandLine1 = 
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
+
+        Assert.assertEquals(
+                "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                joinStringListWithSpace(commandLine1));
+
+        flinkParameters.setFlinkVersion("<1.10");
+        List<String> commandLine2 = 
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
+
+        Assert.assertEquals(
+                "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                joinStringListWithSpace(commandLine2));
+
+        flinkParameters.setFlinkVersion(">=1.12");
+        List<String> commandLine3 = 
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
+
+        Assert.assertEquals(
+                "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                joinStringListWithSpace(commandLine3));
+    }
+
+    @Test
+    public void testRunJarInLocalMode() throws Exception {
+        FlinkParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
+        List<String> commandLine = 
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
+
+        Assert.assertEquals(
+                "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
+                joinStringListWithSpace(commandLine));
+    }
+
+    @Test
+    public void testRunSql() throws Exception {
+        FlinkParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
+        flinkParameters.setProgramType(ProgramType.SQL);
+        List<String> commandLine = 
FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
+
+        Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql 
-f /tmp/execution/app-id_node.sql",
+                joinStringListWithSpace(commandLine));
+    }
+
+    @Test
+    public void testInitOptionsInClusterMode() throws Exception {
+        List<String> initOptions = 
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
+        Assert.assertEquals(2, initOptions.size());
+        Assert.assertTrue(initOptions.contains("set execution.target=local"));
+        Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+    }
+
+    @Test
+    public void testInitOptionsInApplicationMode() throws Exception {
+        List<String> initOptions = 
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
+        Assert.assertEquals(6, initOptions.size());
+        Assert.assertTrue(initOptions.contains("set 
execution.target=yarn-per-job"));
+        Assert.assertTrue(initOptions.contains("set 
taskmanager.numberOfTaskSlots=4"));
+        Assert.assertTrue(initOptions.contains("set 
yarn.application.name=demo-app-name"));
+        Assert.assertTrue(initOptions.contains("set 
jobmanager.memory.process.size=1024m"));
+        Assert.assertTrue(initOptions.contains("set 
taskmanager.memory.process.size=1024m"));
+        Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
deleted file mode 100644
index fbb2d06951..0000000000
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.flink;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({
-    JSONUtils.class
-})
-@PowerMockIgnore({"javax.*"})
-public class FlinkTaskTest {
-
-    @Test
-    public void testBuildCommand() {
-        String parameters = buildFlinkParameters();
-        TaskExecutionContext taskExecutionContext = 
PowerMockito.mock(TaskExecutionContext.class);
-        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-        when(taskExecutionContext.getQueue()).thenReturn("default");
-        FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
-        flinkTask.init();
-        Assert.assertEquals(
-            "flink run " +
-                "-m yarn-cluster " +
-                "-ys 1 " +
-                "-ynm TopSpeedWindowing " +
-                "-yjm 1G " +
-                "-ytm 1G " +
-                "-yqu default " +
-                "-p 2 -sae " +
-                "-c 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing " +
-                "TopSpeedWindowing.jar", flinkTask.buildCommand());
-    }
-
-    @Test
-    public void testBuildCommandWithFlinkSql() {
-        String parameters = buildFlinkParametersWithFlinkSql();
-        TaskExecutionContext taskExecutionContext = 
PowerMockito.mock(TaskExecutionContext.class);
-        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-        when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
-        when(taskExecutionContext.getTaskAppId()).thenReturn("4483");
-        FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
-        flinkTask.init();
-        Assert.assertEquals("sql-client.sh -i /tmp/4483_init.sql -f 
/tmp/4483_node.sql", flinkTask.buildCommand());
-    }
-
-    private String buildFlinkParameters() {
-        ResourceInfo resource = new ResourceInfo();
-        resource.setId(2);
-        resource.setResourceName("/TopSpeedWindowing.jar");
-        resource.setRes("TopSpeedWindowing.jar");
-
-        FlinkParameters parameters = new FlinkParameters();
-        parameters.setLocalParams(Collections.emptyList());
-        parameters.setResourceList(Collections.emptyList());
-        parameters.setProgramType(ProgramType.JAVA);
-        
parameters.setMainClass("org.apache.flink.streaming.examples.windowing.TopSpeedWindowing");
-        parameters.setMainJar(resource);
-        parameters.setDeployMode("cluster");
-        parameters.setAppName("TopSpeedWindowing");
-        parameters.setFlinkVersion(">=1.10");
-        parameters.setJobManagerMemory("1G");
-        parameters.setTaskManagerMemory("1G");
-        parameters.setSlot(1);
-        parameters.setTaskManager(2);
-        parameters.setParallelism(2);
-        return JSONUtils.toJsonString(parameters);
-    }
-
-    private String buildFlinkParametersWithFlinkSql() {
-        FlinkParameters parameters = new FlinkParameters();
-        parameters.setLocalParams(Collections.emptyList());
-        parameters.setInitScript("set 
sql-client.execution.result-mode=tableau;");
-        parameters.setRawScript("selcet 11111;");
-        parameters.setProgramType(ProgramType.SQL);
-        parameters.setMainClass("");
-        parameters.setDeployMode("cluster");
-        parameters.setAppName("FlinkSQL");
-        parameters.setOthers("");
-        parameters.setJobManagerMemory("1G");
-        parameters.setTaskManagerMemory("1G");
-        parameters.setParallelism(1);
-        parameters.setFlinkVersion(">=1.10");
-        return JSONUtils.toJsonString(parameters);
-    }
-}
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
index 568d126ea2..7b61477bc5 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { computed, ref, watchEffect } from 'vue'
+import { computed, watch, watchEffect } from 'vue'
 import { useI18n } from 'vue-i18n'
-import { useCustomParams, useDeployMode, useMainJar, useResources } from '.'
+import { useCustomParams, useMainJar, useResources } from '.'
 import type { IJsonItem } from '../types'
 
 export function useFlink(model: { [field: string]: any }): IJsonItem[] {
@@ -36,14 +36,53 @@ export function useFlink(model: { [field: string]: any }): 
IJsonItem[] {
   )
 
   const taskManagerNumberSpan = computed(() =>
-    model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0
+    model.flinkVersion === '<1.10' && model.deployMode !== 'local' ? 12 : 0
   )
 
   const deployModeSpan = computed(() =>
-    model.deployMode === 'cluster' ? 12 : 0
+    model.deployMode !== 'local' ? 12 : 0
   )
 
-  const appNameSpan = computed(() => (model.deployMode === 'cluster' ? 24 : 0))
+  const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0))
+
+  const deployModeOptions = computed(() => {
+    if (model.flinkVersion === '<1.10') {
+      return [
+        {
+          label: 'cluster',
+          value: 'cluster'
+        },
+        {
+          label: 'local',
+          value: 'local'
+        }
+      ];
+    } else {
+      return [
+        {
+          label: 'per-job/cluster',
+          value: 'cluster'
+        },
+        {
+          label: 'application',
+          value: 'application'
+        },
+        {
+          label: 'local',
+          value: 'local'
+        }
+      ];
+    }
+  })
+
+  watch(
+    () => model.flinkVersion,
+    () => {
+      if (model.flinkVersion === '<1.10' && model.deployMode === 
'application') {
+        model.deployMode = 'cluster'
+      }
+    }
+  )
 
   watchEffect(() => {
     model.flinkVersion = model.programType === 'SQL' ? '>=1.13' : '<1.10'
@@ -86,7 +125,13 @@ export function useFlink(model: { [field: string]: any }): 
IJsonItem[] {
       }
     },
     useMainJar(model),
-    useDeployMode(24, ref(false)),
+    {
+      type: 'radio',
+      field: 'deployMode',
+      name: t('project.node.deploy_mode'),
+      options: deployModeOptions,
+      span: 24
+    },
     {
       type: 'editor',
       field: 'initScript',
@@ -269,7 +314,11 @@ const FLINK_VERSIONS = [
     value: '<1.10'
   },
   {
-    label: '>=1.10',
-    value: '>=1.10'
+    label: '1.11',
+    value: '1.11'
+  },
+  {
+    label: '>=1.12',
+    value: '>=1.12'
   }
 ]

Reply via email to