This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.8-prepare by this push:
     new 87481aa7e0 [bug][plugin]Fix: Correct the way to determine the yarn 
queue in Flink CommandLine and SQL mode (#14237)
87481aa7e0 is described below

commit 87481aa7e01f49c91ab2ffb2dabc71a5a8f823b3
Author: ORuteMa <[email protected]>
AuthorDate: Mon Jun 12 11:04:39 2023 +0800

    [bug][plugin]Fix: Correct the way to determine the yarn queue in Flink 
CommandLine and SQL mode (#14237)
    
    * Fix: Correct the way to determine the yarn queue in Flink CommandLine
    
    * fix the yarn queue in sql mode && refine the code
    
    * refine code
    
    * remove unnecessary comment
    
    * fix yarn queue properties
    
    * remove redundant variable
---
 .../plugin/task/flink/FlinkArgsUtils.java          | 50 ++++++++++++++++------
 .../plugin/task/flink/FlinkConstants.java          |  3 +-
 2 files changed, 39 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 69f7465a75..782b2f2f97 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -156,12 +156,9 @@ public class FlinkArgsUtils {
             }
 
             // yarn.application.queue
-            String others = flinkParameters.getOthers();
-            if (StringUtils.isEmpty(others) || 
!others.contains(FlinkConstants.FLINK_QUEUE)) {
-                String queue = flinkParameters.getQueue();
-                if (StringUtils.isNotEmpty(queue)) {
-                    
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
 queue));
-                }
+            String queue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
 queue));
             }
         }
 
@@ -241,13 +238,6 @@ public class FlinkArgsUtils {
                     args.add(taskManagerMemory);
                 }
 
-                if (StringUtils.isEmpty(others) || 
!others.contains(FlinkConstants.FLINK_QUEUE)) {
-                    String queue = flinkParameters.getQueue();
-                    if (StringUtils.isNotEmpty(queue)) { // -yqu
-                        args.add(FlinkConstants.FLINK_QUEUE);
-                        args.add(queue);
-                    }
-                }
                 break;
             case LOCAL:
                 break;
@@ -290,6 +280,40 @@ public class FlinkArgsUtils {
             args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, 
ParamUtils.convert(paramsMap)));
         }
 
+        // determine yarn queue
+        determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
         return args;
     }
+
+    private static void determinedYarnQueue(List<String> args, FlinkParameters 
flinkParameters,
+                                            FlinkDeployMode deployMode, String 
flinkVersion) {
+        switch (deployMode) {
+            case CLUSTER:
+                if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
+                        || 
FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
+                    doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+                } else {
+                    doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_QUEUE_FOR_MODE);
+                }
+            case APPLICATION:
+                doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+        }
+    }
+
+    private static void doAddQueue(List<String> args, FlinkParameters 
flinkParameters, String option) {
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isEmpty(others) || !others.contains(option)) {
+            String queue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                switch (option) {
+                    case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
+                        
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
+                    case FlinkConstants.FLINK_QUEUE_FOR_MODE:
+                        args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
+                        args.add(queue);
+                }
+            }
+        }
+    }
+
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index de513e0b0d..6e8d51b2ce 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -48,7 +48,8 @@ public class FlinkConstants {
     public static final String FLINK_EXECUTION_TARGET = "-t";
     public static final String FLINK_YARN_SLOT = "-ys";
     public static final String FLINK_APP_NAME = "-ynm";
-    public static final String FLINK_QUEUE = "-yqu";
+    public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
+    public static final String FLINK_QUEUE_FOR_TARGETS = 
"-Dyarn.application.queue";
     public static final String FLINK_TASK_MANAGE = "-yn";
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
     public static final String FLINK_TASK_MANAGE_MEM = "-ytm";

Reply via email to