This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e748c2eb9a fix the #14729 problem (#14902)
e748c2eb9a is described below

commit e748c2eb9a8e0e8a365d58a91b5c24db837c8667
Author: LiuCanWu <[email protected]>
AuthorDate: Thu Sep 21 18:46:14 2023 +0800

    fix the #14729 problem (#14902)
    
    Co-authored-by: 刘阳 <[email protected]>
    Co-authored-by: xiangzihao <[email protected]>
---
 .../apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 6937db6b78..6d28ffcc6d 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -273,6 +273,8 @@ public class FlinkArgsUtils {
             args.add(others);
         }
 
+        // determine yarn queue
+        determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
         ProgramType programType = flinkParameters.getProgramType();
         String mainClass = flinkParameters.getMainClass();
         if (programType != null && programType != ProgramType.PYTHON && 
StringUtils.isNotEmpty(mainClass)) {
@@ -295,8 +297,6 @@ public class FlinkArgsUtils {
             args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, 
ParameterUtils.convert(paramsMap)));
         }
 
-        // determine yarn queue
-        determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
         return args;
     }
 
@@ -310,8 +310,10 @@ public class FlinkArgsUtils {
                 } else {
                     doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
                 }
+                break;
             case APPLICATION:
                 doAddQueue(args, flinkParameters, 
FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
+                break;
         }
     }
 
@@ -323,9 +325,11 @@ public class FlinkArgsUtils {
                 switch (option) {
                     case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
                         
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", 
yarnQueue));
+                        break;
                     case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
                         args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
                         args.add(yarnQueue);
+                        break;
                 }
             }
         }

Reply via email to