This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.2-prepare by this push:
     new a87206228a [Bug-13010] [Task] The Flink SQL task page selects the 
pre-job deployment mode, but the task executed by the worker is the Flink local 
mode
a87206228a is described below

commit a87206228a1b07db65a92acadd04048949ad7e60
Author: Kerwin <[email protected]>
AuthorDate: Mon Nov 28 10:09:49 2022 +0800

    [Bug-13010] [Task] The Flink SQL task page selects the pre-job deployment 
mode, but the task executed by the worker is the Flink local mode
---
 .../plugin/task/flink/FlinkArgsUtilsTest.java      | 46 +++++++++++-----------
 .../plugin/task/flink/FlinkArgsUtils.java          |  4 +-
 .../plugin/task/flink/FlinkConstants.java          |  1 +
 .../plugin/task/flink/FlinkArgsUtilsTest.java      | 46 +++++++++++-----------
 .../task/components/node/fields/use-flink.ts       | 12 ++++++
 5 files changed, 63 insertions(+), 46 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index cbd9852e2e..5a42755c1f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.flink;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
@@ -60,7 +60,7 @@ public class FlinkArgsUtilsTest {
         FlinkStreamParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run-application -t yarn-application -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
@@ -71,21 +71,21 @@ public class FlinkArgsUtilsTest {
         flinkParameters.setFlinkVersion("1.11");
         List<String> commandLine1 = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine1));
 
         flinkParameters.setFlinkVersion("<1.10");
         List<String> commandLine2 = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine2));
 
         flinkParameters.setFlinkVersion(">=1.12");
         List<String> commandLine3 = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine3));
     }
@@ -95,7 +95,7 @@ public class FlinkArgsUtilsTest {
         FlinkStreamParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
@@ -106,27 +106,29 @@ public class FlinkArgsUtilsTest {
         flinkParameters.setProgramType(ProgramType.SQL);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql 
-f /tmp/execution/app-id_node.sql",
+        Assertions.assertEquals("sql-client.sh -i 
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
                 joinStringListWithSpace(commandLine));
     }
 
     @Test
-    public void testInitOptionsInClusterMode() throws Exception {
-        List<String> initOptions = 
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
-        Assert.assertEquals(2, initOptions.size());
-        Assert.assertTrue(initOptions.contains("set execution.target=local"));
-        Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+    public void testInitOptionsInLocalMode() throws Exception {
+        List<String> initOptions =
+                
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL));
+        Assertions.assertEquals(2, initOptions.size());
+        Assertions.assertTrue(initOptions.contains("set 
execution.target=local"));
+        Assertions.assertTrue(initOptions.contains("set 
parallelism.default=4"));
     }
 
     @Test
-    public void testInitOptionsInApplicationMode() throws Exception {
-        List<String> initOptions = 
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
-        Assert.assertEquals(6, initOptions.size());
-        Assert.assertTrue(initOptions.contains("set 
execution.target=yarn-per-job"));
-        Assert.assertTrue(initOptions.contains("set 
taskmanager.numberOfTaskSlots=4"));
-        Assert.assertTrue(initOptions.contains("set 
yarn.application.name=demo-app-name"));
-        Assert.assertTrue(initOptions.contains("set 
jobmanager.memory.process.size=1024m"));
-        Assert.assertTrue(initOptions.contains("set 
taskmanager.memory.process.size=1024m"));
-        Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+    public void testInitOptionsInClusterMode() throws Exception {
+        List<String> initOptions = FlinkArgsUtils
+                
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
+        Assertions.assertEquals(6, initOptions.size());
+        Assertions.assertTrue(initOptions.contains("set 
execution.target=yarn-per-job"));
+        Assertions.assertTrue(initOptions.contains("set 
taskmanager.numberOfTaskSlots=4"));
+        Assertions.assertTrue(initOptions.contains("set 
yarn.application.name=demo-app-name"));
+        Assertions.assertTrue(initOptions.contains("set 
jobmanager.memory.process.size=1024m"));
+        Assertions.assertTrue(initOptions.contains("set 
taskmanager.memory.process.size=1024m"));
+        Assertions.assertTrue(initOptions.contains("set 
parallelism.default=4"));
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index a231b67034..69f7465a75 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -124,9 +124,9 @@ public class FlinkArgsUtils {
         /**
          * Currently flink sql on yarn only supports yarn-per-job mode
          */
-        if (FlinkDeployMode.CLUSTER == deployMode) {
+        if (FlinkDeployMode.LOCAL == deployMode) {
             // execution.target
-            
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
"local"));
+            
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
FlinkConstants.FLINK_LOCAL));
         } else {
             // execution.target
             
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, 
FlinkConstants.FLINK_YARN_PER_JOB));
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index e448596370..de513e0b0d 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -43,6 +43,7 @@ public class FlinkConstants {
     public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
     public static final String FLINK_YARN_APPLICATION = "yarn-application";
     public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
+    public static final String FLINK_LOCAL = "local";
     public static final String FLINK_RUN_MODE = "-m";
     public static final String FLINK_EXECUTION_TARGET = "-t";
     public static final String FLINK_YARN_SLOT = "-ys";
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index f71afb4d0e..cdedae8da8 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.flink;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
@@ -60,7 +60,7 @@ public class FlinkArgsUtilsTest {
         FlinkParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run-application -t yarn-application -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
@@ -71,21 +71,21 @@ public class FlinkArgsUtilsTest {
         flinkParameters.setFlinkVersion("1.11");
         List<String> commandLine1 = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine1));
 
         flinkParameters.setFlinkVersion("<1.10");
         List<String> commandLine2 = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine2));
 
         flinkParameters.setFlinkVersion(">=1.12");
         List<String> commandLine3 = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine3));
     }
@@ -95,7 +95,7 @@ public class FlinkArgsUtilsTest {
         FlinkParameters flinkParameters = 
buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
@@ -106,27 +106,29 @@ public class FlinkArgsUtilsTest {
         flinkParameters.setProgramType(ProgramType.SQL);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql 
-f /tmp/execution/app-id_node.sql",
+        Assertions.assertEquals("sql-client.sh -i 
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
                 joinStringListWithSpace(commandLine));
     }
 
     @Test
-    public void testInitOptionsInClusterMode() throws Exception {
-        List<String> initOptions = 
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
-        Assert.assertEquals(2, initOptions.size());
-        Assert.assertTrue(initOptions.contains("set execution.target=local"));
-        Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+    public void testInitOptionsInLocalMode() throws Exception {
+        List<String> initOptions =
+                
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL));
+        Assertions.assertEquals(2, initOptions.size());
+        Assertions.assertTrue(initOptions.contains("set 
execution.target=local"));
+        Assertions.assertTrue(initOptions.contains("set 
parallelism.default=4"));
     }
 
     @Test
-    public void testInitOptionsInApplicationMode() throws Exception {
-        List<String> initOptions = 
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
-        Assert.assertEquals(6, initOptions.size());
-        Assert.assertTrue(initOptions.contains("set 
execution.target=yarn-per-job"));
-        Assert.assertTrue(initOptions.contains("set 
taskmanager.numberOfTaskSlots=4"));
-        Assert.assertTrue(initOptions.contains("set 
yarn.application.name=demo-app-name"));
-        Assert.assertTrue(initOptions.contains("set 
jobmanager.memory.process.size=1024m"));
-        Assert.assertTrue(initOptions.contains("set 
taskmanager.memory.process.size=1024m"));
-        Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
+    public void testInitOptionsInClusterMode() throws Exception {
+        List<String> initOptions = FlinkArgsUtils
+                
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
+        Assertions.assertEquals(6, initOptions.size());
+        Assertions.assertTrue(initOptions.contains("set 
execution.target=yarn-per-job"));
+        Assertions.assertTrue(initOptions.contains("set 
taskmanager.numberOfTaskSlots=4"));
+        Assertions.assertTrue(initOptions.contains("set 
yarn.application.name=demo-app-name"));
+        Assertions.assertTrue(initOptions.contains("set 
jobmanager.memory.process.size=1024m"));
+        Assertions.assertTrue(initOptions.contains("set 
taskmanager.memory.process.size=1024m"));
+        Assertions.assertTrue(initOptions.contains("set 
parallelism.default=4"));
     }
 }
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
index 89035f0441..d987d5ff56 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
@@ -44,6 +44,18 @@ export function useFlink(model: { [field: string]: any }): 
IJsonItem[] {
   const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0))
 
   const deployModeOptions = computed(() => {
+    if (model.programType === 'SQL') {
+      return [
+        {
+          label: 'per-job/cluster',
+          value: 'cluster'
+        },
+        {
+          label: 'local',
+          value: 'local'
+        }
+      ]
+    }
     if (model.flinkVersion === '<1.10') {
       return [
         {

Reply via email to