This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ae670e6e1c [Bug-13010] [Task] The Flink SQL task page selects the
pre-job deployment mode, but the task executed by the worker is the Flink local
mode (#13011)
ae670e6e1c is described below
commit ae670e6e1c0c0bcd645ab3e1776eca75b9765b8c
Author: Kerwin <[email protected]>
AuthorDate: Mon Nov 28 10:09:49 2022 +0800
[Bug-13010] [Task] The Flink SQL task page selects the pre-job deployment
mode, but the task executed by the worker is the Flink local mode (#13011)
---
.../plugin/task/flink/FlinkArgsUtilsTest.java | 8 ++++----
.../dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java | 4 ++--
.../dolphinscheduler/plugin/task/flink/FlinkConstants.java | 1 +
.../plugin/task/flink/FlinkArgsUtilsTest.java | 8 ++++----
.../views/projects/task/components/node/fields/use-flink.ts | 12 ++++++++++++
5 files changed, 23 insertions(+), 10 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 8d3f83b622..e8d29df4be 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -115,18 +115,18 @@ public class FlinkArgsUtilsTest {
}
@Test
- public void testInitOptionsInClusterMode() throws Exception {
+ public void testInitOptionsInLocalMode() throws Exception {
List<String> initOptions =
-
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
+
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL));
Assertions.assertEquals(2, initOptions.size());
Assertions.assertTrue(initOptions.contains("set
execution.target=local"));
Assertions.assertTrue(initOptions.contains("set
parallelism.default=4"));
}
@Test
- public void testInitOptionsInApplicationMode() throws Exception {
+ public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils
-
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
+
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assertions.assertEquals(6, initOptions.size());
Assertions.assertTrue(initOptions.contains("set
execution.target=yarn-per-job"));
Assertions.assertTrue(initOptions.contains("set
taskmanager.numberOfTaskSlots=4"));
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 5e63a32bba..7112d0e9e7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -126,9 +126,9 @@ public class FlinkArgsUtils {
/**
* Currently flink sql on yarn only supports yarn-per-job mode
*/
- if (FlinkDeployMode.CLUSTER == deployMode) {
+ if (FlinkDeployMode.LOCAL == deployMode) {
// execution.target
-
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET,
"local"));
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET,
FlinkConstants.FLINK_LOCAL));
} else {
// execution.target
initOptions.add(
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index e448596370..de513e0b0d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -43,6 +43,7 @@ public class FlinkConstants {
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_YARN_APPLICATION = "yarn-application";
public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
+ public static final String FLINK_LOCAL = "local";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 62148521aa..952e0c42ed 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -115,18 +115,18 @@ public class FlinkArgsUtilsTest {
}
@Test
- public void testInitOptionsInClusterMode() throws Exception {
+ public void testInitOptionsInLocalMode() throws Exception {
List<String> initOptions =
-
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
+
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL));
Assertions.assertEquals(2, initOptions.size());
Assertions.assertTrue(initOptions.contains("set
execution.target=local"));
Assertions.assertTrue(initOptions.contains("set
parallelism.default=4"));
}
@Test
- public void testInitOptionsInApplicationMode() throws Exception {
+ public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils
-
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
+
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assertions.assertEquals(6, initOptions.size());
Assertions.assertTrue(initOptions.contains("set
execution.target=yarn-per-job"));
Assertions.assertTrue(initOptions.contains("set
taskmanager.numberOfTaskSlots=4"));
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
index 89035f0441..d987d5ff56 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
@@ -44,6 +44,18 @@ export function useFlink(model: { [field: string]: any }):
IJsonItem[] {
const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0))
const deployModeOptions = computed(() => {
+ if (model.programType === 'SQL') {
+ return [
+ {
+ label: 'per-job/cluster',
+ value: 'cluster'
+ },
+ {
+ label: 'local',
+ value: 'local'
+ }
+ ]
+ }
if (model.flinkVersion === '<1.10') {
return [
{