This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e748c2eb9a fix the #14729 problem (#14902)
e748c2eb9a is described below
commit e748c2eb9a8e0e8a365d58a91b5c24db837c8667
Author: LiuCanWu <[email protected]>
AuthorDate: Thu Sep 21 18:46:14 2023 +0800
fix the #14729 problem (#14902)
Co-authored-by: 刘阳 <[email protected]>
Co-authored-by: xiangzihao <[email protected]>
---
.../apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 6937db6b78..6d28ffcc6d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -273,6 +273,8 @@ public class FlinkArgsUtils {
args.add(others);
}
+ // determine yarn queue
+ determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != null && programType != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
@@ -295,8 +297,6 @@ public class FlinkArgsUtils {
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs,
ParameterUtils.convert(paramsMap)));
}
- // determine yarn queue
- determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
return args;
}
@@ -310,8 +310,10 @@ public class FlinkArgsUtils {
} else {
doAddQueue(args, flinkParameters,
FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
}
+ break;
case APPLICATION:
doAddQueue(args, flinkParameters,
FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
+ break;
}
}
@@ -323,9 +325,11 @@ public class FlinkArgsUtils {
switch (option) {
case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s",
yarnQueue));
+ break;
case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
args.add(yarnQueue);
+ break;
}
}
}