This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
new 505da21 [1.3.6-prepare][Feature][Flink] Support name and parallelism
input #4285 #4937 (#4976)
505da21 is described below
commit 505da21fc262b18cfd0767f9e72f02b6280f7344
Author: Shiwen Cheng <[email protected]>
AuthorDate: Sat Mar 6 23:11:36 2021 +0800
[1.3.6-prepare][Feature][Flink] Support name and parallelism input #4285
#4937 (#4976)
---
.../apache/dolphinscheduler/common/Constants.java | 3 +-
.../common/task/flink/FlinkParameters.java | 27 ++++++++++----
.../server/utils/FlinkArgsUtils.java | 20 ++++++----
.../server/utils/FlinkArgsUtilsTest.java | 32 +++++++++-------
.../pages/dag/_source/formModel/tasks/flink.vue | 43 +++++++++++++++++++++-
.../src/js/module/i18n/locale/en_US.js | 4 ++
.../src/js/module/i18n/locale/zh_CN.js | 4 ++
7 files changed, 104 insertions(+), 29 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index f8ba887..5dc6aad 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -828,8 +828,9 @@ public final class Constants {
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
- public static final String FLINK_DETACH = "-d";
public static final String FLINK_MAIN_CLASS = "-c";
+ public static final String FLINK_PARALLELISM = "-p";
+ public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final int[] NOT_TERMINATED_STATES = new int[]{
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
index 231dd33..82613f2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType;
@@ -39,8 +40,8 @@ public class FlinkParameters extends AbstractParameters {
private String mainClass;
/**
- * deploy mode yarn-cluster yarn-client yarn-local
- */
+ * deploy mode yarn-cluster yarn-local
+ */
private String deployMode;
/**
@@ -54,25 +55,29 @@ public class FlinkParameters extends AbstractParameters {
private int slot;
/**
- *Yarn application name
+ * parallelism
*/
+ private int parallelism;
+ /**
+ * yarn application name
+ */
private String appName;
/**
* taskManager count
*/
- private int taskManager;
+ private int taskManager;
/**
* job manager memory
*/
- private String jobManagerMemory ;
+ private String jobManagerMemory;
/**
* task manager memory
*/
- private String taskManagerMemory;
+ private String taskManagerMemory;
/**
* resource list
@@ -140,6 +145,14 @@ public class FlinkParameters extends AbstractParameters {
this.slot = slot;
}
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
public String getAppName() {
return appName;
}
@@ -217,7 +230,6 @@ public class FlinkParameters extends AbstractParameters {
return mainJar != null && programType != null;
}
-
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
@@ -225,4 +237,5 @@ public class FlinkParameters extends AbstractParameters {
}
return resourceList;
}
+
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
index 519ddf2..8b3b691 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
@@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.server.utils;
-import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
@@ -53,7 +53,7 @@ public class FlinkArgsUtils {
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
int slot = param.getSlot();
- if (slot != 0) {
+ if (slot > 0) {
args.add(Constants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
@@ -68,7 +68,7 @@ public class FlinkArgsUtils {
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null ||
FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
- if (taskManager != 0) { //-yn
+ if (taskManager > 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
@@ -92,12 +92,19 @@ public class FlinkArgsUtils {
args.add(queue);
}
}
+ }
- args.add(Constants.FLINK_DETACH); //-d
-
+ int parallelism = param.getParallelism();
+ if (parallelism > 0) {
+ args.add(Constants.FLINK_PARALLELISM);
+ args.add(String.format("%d", parallelism)); // -p
}
- // -p -s -yqu -yat -sae -yD -D
+ // If the job is submitted in attached mode, perform a best-effort
cluster shutdown when the CLI is terminated abruptly
+ // The task status will be synchronized with the cluster job status
+ args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
+
+ // -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
@@ -122,5 +129,4 @@ public class FlinkArgsUtils {
return args;
}
-
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index bea6775..f030628 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+
+import java.util.List;
+
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
/**
* Test FlinkArgsUtils
*/
@@ -39,6 +40,7 @@ public class FlinkArgsUtilsTest {
public String mode = "cluster";
public int slot = 2;
+ public int parallelism = 3;
public String appName = "testFlink";
public int taskManager = 4;
public String taskManagerMemory = "2G";
@@ -48,7 +50,7 @@ public class FlinkArgsUtilsTest {
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
- public String others = "-p 4";
+ public String others = "-s hdfs:///flink/savepoint-1537";
public String flinkVersion = "<1.10";
@@ -72,6 +74,7 @@ public class FlinkArgsUtilsTest {
param.setMainClass(mainClass);
param.setAppName(appName);
param.setSlot(slot);
+ param.setParallelism(parallelism);
param.setTaskManager(taskManager);
param.setJobManagerMemory(jobManagerMemory);
param.setTaskManagerMemory(taskManagerMemory);
@@ -89,7 +92,7 @@ public class FlinkArgsUtilsTest {
}
//Expected values and order
- assertEquals(20, result.size());
+ assertEquals(22, result.size());
assertEquals("-m", result.get(0));
assertEquals("yarn-cluster", result.get(1));
@@ -112,15 +115,18 @@ public class FlinkArgsUtilsTest {
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
- assertEquals("-d", result.get(14));
+ assertEquals("-p", result.get(14));
+ assertSame(Integer.valueOf(result.get(15)),parallelism);
+
+ assertEquals("-sae", result.get(16));
- assertEquals(result.get(15),others);
+ assertEquals(result.get(17),others);
- assertEquals("-c", result.get(16));
- assertEquals(result.get(17),mainClass);
+ assertEquals("-c", result.get(18));
+ assertEquals(result.get(19),mainClass);
- assertEquals(result.get(18),mainJar.getRes());
- assertEquals(result.get(19),mainArgs);
+ assertEquals(result.get(20),mainJar.getRes());
+ assertEquals(result.get(21),mainArgs);
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
index 1bf7cc1..21048ae 100644
---
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
@@ -79,6 +79,18 @@
</x-select>
</div>
</m-list-box>
+ <m-list-box v-if="deployMode === 'cluster'">
+ <div slot="text">{{$t('App Name')}}</div>
+ <div slot="content">
+ <x-input
+ :disabled="isDetails"
+ type="input"
+ v-model="appName"
+ :placeholder="$t('Please enter app name(optional)')"
+ autocomplete="off">
+ </x-input>
+ </div>
+ </m-list-box>
<div class="list-box-4p" v-if="deployMode === 'cluster'">
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('JobManager
Memory')}}</span>
@@ -129,6 +141,21 @@
</span>
</div>
</div>
+ <div class="list-box-4p">
+ <div class="clearfix list">
+ <span class="sp1"
style="word-break:break-all">{{$t('Parallelism')}}</span>
+ <span class="sp2">
+ <x-input
+ :disabled="isDetails"
+ type="input"
+ v-model="parallelism"
+ :placeholder="$t('Please enter Parallelism')"
+ style="width: 200px;"
+ autocomplete="off">
+ </x-input>
+ </span>
+ </div>
+ </div>
<m-list-box>
<div slot="text">{{$t('Main Arguments')}}</div>
<div slot="content">
@@ -209,12 +236,16 @@
localParams: [],
// Slot number
slot: 1,
+ // Parallelism
+ parallelism: 1,
// TaskManager mumber
taskManager: '2',
// JobManager memory
jobManagerMemory: '1G',
// TaskManager memory
taskManagerMemory: '2G',
+ // Flink app name
+ appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@@ -322,6 +353,11 @@
return false
}
+ if (!Number.isInteger(parseInt(this.parallelism))) {
+ this.$message.warning(`${i18n.$t('Please enter Parallelism')}`)
+ return false
+ }
+
if (this.flinkVersion === '<1.10' &&
!Number.isInteger(parseInt(this.taskManager))) {
this.$message.warning(`${i18n.$t('Please enter TaskManager
number')}`)
return false
@@ -351,9 +387,11 @@
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
+ parallelism: this.parallelism,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
taskManagerMemory: this.taskManagerMemory,
+ appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@@ -481,9 +519,11 @@
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
+ parallelism: this.parallelism,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
taskManagerMemory: this.taskManagerMemory,
+ appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@@ -522,10 +562,11 @@
this.deployMode = o.params.deployMode || ''
this.flinkVersion = o.params.flinkVersion || '<1.10'
this.slot = o.params.slot || 1
+ this.parallelism = o.params.parallelism || 1
this.taskManager = o.params.taskManager || '2'
this.jobManagerMemory = o.params.jobManagerMemory || '1G'
this.taskManagerMemory = o.params.taskManagerMemory || '2G'
-
+ this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index 5c3de1f..5c983ab 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -118,8 +118,12 @@ export default {
'Please enter TaskManager memory': 'Please enter TaskManager memory',
'Slot Number': 'Slot Number',
'Please enter Slot number': 'Please enter Slot number',
+ Parallelism: 'Parallelism',
+ 'Please enter Parallelism': 'Please enter Parallelism',
'TaskManager Number': 'TaskManager Number',
'Please enter TaskManager number': 'Please enter TaskManager number',
+ 'App Name': 'App Name',
+ 'Please enter app name(optional)': 'Please enter app name(optional)',
'SQL Type': 'SQL Type',
Title: 'Title',
'Please enter the title of email': 'Please enter the title of email',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 3368438..88382b5 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -118,8 +118,12 @@ export default {
'Please enter TaskManager memory': '请输入TaskManager内存数',
'Slot Number': 'Slot数量',
'Please enter Slot number': '请输入Slot数量',
+ Parallelism: '并行度',
+ 'Please enter Parallelism': '请输入并行度',
'TaskManager Number': 'TaskManager数量',
'Please enter TaskManager number': '请输入TaskManager数量',
+ 'App Name': '任务名称',
+ 'Please enter app name(optional)': '请输入任务名称(选填)',
'SQL Type': 'sql类型',
Title: '主题',
'Please enter the title of email': '请输入邮件主题',