This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.9-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 28e74e41b9361ac6075a808bade3ee3655feeecf
Author: LiuCanWu <[email protected]>
AuthorDate: Thu Sep 21 18:46:14 2023 +0800

    cherry-pick [Fix-14729] fix problem with the command generated by the flink 
task #14902
---
 .../plugin/task/flink/FlinkArgsUtils.java          | 28 ++++++++++++----------
 .../plugin/task/flink/FlinkConstants.java          |  4 ++--
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 782b2f2f97..45590d3598 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -258,6 +258,8 @@ public class FlinkArgsUtils {
             args.add(others);
         }
 
+        // determine yarn queue
+        determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
         ProgramType programType = flinkParameters.getProgramType();
         String mainClass = flinkParameters.getMainClass();
         if (programType != null && programType != ProgramType.PYTHON && 
StringUtils.isNotEmpty(mainClass)) {
@@ -280,8 +282,6 @@ public class FlinkArgsUtils {
             args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, 
ParamUtils.convert(paramsMap)));
         }
 
-        // determine yarn queue
-        determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
         return args;
     }
 
@@ -291,26 +291,30 @@ public class FlinkArgsUtils {
             case CLUSTER:
                 if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
                         || 
FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
-                    doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+                    doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
                 } else {
-                    doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_QUEUE_FOR_MODE);
+                    doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
                 }
+                break;
             case APPLICATION:
-                doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+                doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
+                break;
         }
     }
 
     private static void doAddQueue(List<String> args, FlinkParameters 
flinkParameters, String option) {
         String others = flinkParameters.getOthers();
         if (StringUtils.isEmpty(others) || !others.contains(option)) {
-            String queue = flinkParameters.getQueue();
-            if (StringUtils.isNotEmpty(queue)) {
+            String yarnQueue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(yarnQueue)) {
                 switch (option) {
-                    case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
-                        
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
-                    case FlinkConstants.FLINK_QUEUE_FOR_MODE:
-                        args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
-                        args.add(queue);
+                    case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
+                        
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", 
yarnQueue));
+                        break;
+                    case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
+                        args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
+                        args.add(yarnQueue);
+                        break;
                 }
             }
         }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index 6e8d51b2ce..b2d7607761 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -48,8 +48,8 @@ public class FlinkConstants {
     public static final String FLINK_EXECUTION_TARGET = "-t";
     public static final String FLINK_YARN_SLOT = "-ys";
     public static final String FLINK_APP_NAME = "-ynm";
-    public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
-    public static final String FLINK_QUEUE_FOR_TARGETS = 
"-Dyarn.application.queue";
+    public static final String FLINK_YARN_QUEUE_FOR_MODE = "-yqu";
+    public static final String FLINK_YARN_QUEUE_FOR_TARGETS = 
"-Dyarn.application.queue";
     public static final String FLINK_TASK_MANAGE = "-yn";
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
     public static final String FLINK_TASK_MANAGE_MEM = "-ytm";

Reply via email to