This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
     new 505da21  [1.3.6-prepare][Feature][Flink] Support name and parallelism 
input #4285 #4937 (#4976)
505da21 is described below

commit 505da21fc262b18cfd0767f9e72f02b6280f7344
Author: Shiwen Cheng <[email protected]>
AuthorDate: Sat Mar 6 23:11:36 2021 +0800

    [1.3.6-prepare][Feature][Flink] Support name and parallelism input #4285 
#4937 (#4976)
---
 .../apache/dolphinscheduler/common/Constants.java  |  3 +-
 .../common/task/flink/FlinkParameters.java         | 27 ++++++++++----
 .../server/utils/FlinkArgsUtils.java               | 20 ++++++----
 .../server/utils/FlinkArgsUtilsTest.java           | 32 +++++++++-------
 .../pages/dag/_source/formModel/tasks/flink.vue    | 43 +++++++++++++++++++++-
 .../src/js/module/i18n/locale/en_US.js             |  4 ++
 .../src/js/module/i18n/locale/zh_CN.js             |  4 ++
 7 files changed, 104 insertions(+), 29 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index f8ba887..5dc6aad 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -828,8 +828,9 @@ public final class Constants {
 
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
     public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
-    public static final String FLINK_DETACH = "-d";
     public static final String FLINK_MAIN_CLASS = "-c";
+    public static final String FLINK_PARALLELISM = "-p";
+    public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
 
 
     public static final int[] NOT_TERMINATED_STATES = new int[]{
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
index 231dd33..82613f2 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.task.flink;
 
 import org.apache.dolphinscheduler.common.enums.ProgramType;
@@ -39,8 +40,8 @@ public class FlinkParameters extends AbstractParameters {
   private String mainClass;
 
   /**
-   * deploy mode  yarn-cluster  yarn-client  yarn-local
-    */
+   * deploy mode  yarn-cluster yarn-local
+   */
   private String deployMode;
 
   /**
@@ -54,25 +55,29 @@ public class FlinkParameters extends AbstractParameters {
   private int slot;
 
   /**
-   *Yarn application name
+   * parallelism
    */
+  private int parallelism;
 
+  /**
+   * yarn application name
+   */
   private String appName;
 
   /**
    * taskManager count
    */
-  private int  taskManager;
+  private int taskManager;
 
   /**
    * job manager memory
    */
-  private String  jobManagerMemory ;
+  private String jobManagerMemory;
 
   /**
    * task manager memory
    */
-  private String  taskManagerMemory;
+  private String taskManagerMemory;
 
   /**
    * resource list
@@ -140,6 +145,14 @@ public class FlinkParameters extends AbstractParameters {
     this.slot = slot;
   }
 
+  public int getParallelism() {
+    return parallelism;
+  }
+
+  public void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
+
   public String getAppName() {
     return appName;
   }
@@ -217,7 +230,6 @@ public class FlinkParameters extends AbstractParameters {
     return mainJar != null && programType != null;
   }
 
-
   @Override
   public List<ResourceInfo> getResourceFilesList() {
     if (mainJar != null && !resourceList.contains(mainJar)) {
@@ -225,4 +237,5 @@ public class FlinkParameters extends AbstractParameters {
     }
     return resourceList;
   }
+
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
index 519ddf2..8b3b691 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
@@ -17,11 +17,11 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -53,7 +53,7 @@ public class FlinkArgsUtils {
             args.add(Constants.FLINK_YARN_CLUSTER);   //yarn-cluster
 
             int slot = param.getSlot();
-            if (slot != 0) {
+            if (slot > 0) {
                 args.add(Constants.FLINK_YARN_SLOT);
                 args.add(String.format("%d", slot));   //-ys
             }
@@ -68,7 +68,7 @@ public class FlinkArgsUtils {
             String flinkVersion = param.getFlinkVersion();
             if (flinkVersion == null || 
FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
                 int taskManager = param.getTaskManager();
-                if (taskManager != 0) {                        //-yn
+                if (taskManager > 0) {                        //-yn
                     args.add(Constants.FLINK_TASK_MANAGE);
                     args.add(String.format("%d", taskManager));
                 }
@@ -92,12 +92,19 @@ public class FlinkArgsUtils {
                     args.add(queue);
                 }
             }
+        }
 
-            args.add(Constants.FLINK_DETACH); //-d
-
+        int parallelism = param.getParallelism();
+        if (parallelism > 0) {
+            args.add(Constants.FLINK_PARALLELISM);
+            args.add(String.format("%d", parallelism));   // -p
         }
 
-        // -p -s -yqu -yat -sae -yD -D
+        // If the job is submitted in attached mode, perform a best-effort 
cluster shutdown when the CLI is terminated abruptly
+        // The task status will be synchronized with the cluster job status
+        args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
+
+        // -s -yqu -yat -yD -D
         if (StringUtils.isNotEmpty(others)) {
             args.add(others);
         }
@@ -122,5 +129,4 @@ public class FlinkArgsUtils {
         return args;
     }
 
-
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index bea6775..f030628 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -17,19 +17,20 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+
+import java.util.List;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
 /**
  * Test FlinkArgsUtils
  */
@@ -39,6 +40,7 @@ public class FlinkArgsUtilsTest {
 
     public String mode = "cluster";
     public int slot = 2;
+    public int parallelism = 3;
     public String appName = "testFlink";
     public int taskManager = 4;
     public String taskManagerMemory = "2G";
@@ -48,7 +50,7 @@ public class FlinkArgsUtilsTest {
     public ResourceInfo mainJar = null;
     public String mainArgs = "testArgs --input file:///home";
     public String queue = "queue1";
-    public String others = "-p 4";
+    public String others = "-s hdfs:///flink/savepoint-1537";
     public String flinkVersion = "<1.10";
 
 
@@ -72,6 +74,7 @@ public class FlinkArgsUtilsTest {
         param.setMainClass(mainClass);
         param.setAppName(appName);
         param.setSlot(slot);
+        param.setParallelism(parallelism);
         param.setTaskManager(taskManager);
         param.setJobManagerMemory(jobManagerMemory);
         param.setTaskManagerMemory(taskManagerMemory);
@@ -89,7 +92,7 @@ public class FlinkArgsUtilsTest {
         }
 
         //Expected values and order
-        assertEquals(20, result.size());
+        assertEquals(22, result.size());
 
         assertEquals("-m", result.get(0));
         assertEquals("yarn-cluster", result.get(1));
@@ -112,15 +115,18 @@ public class FlinkArgsUtilsTest {
         assertEquals("-yqu", result.get(12));
         assertEquals(result.get(13),queue);
 
-        assertEquals("-d", result.get(14));
+        assertEquals("-p", result.get(14));
+        assertSame(Integer.valueOf(result.get(15)),parallelism);
+
+        assertEquals("-sae", result.get(16));
 
-        assertEquals(result.get(15),others);
+        assertEquals(result.get(17),others);
 
-        assertEquals("-c", result.get(16));
-        assertEquals(result.get(17),mainClass);
+        assertEquals("-c", result.get(18));
+        assertEquals(result.get(19),mainClass);
 
-        assertEquals(result.get(18),mainJar.getRes());
-        assertEquals(result.get(19),mainArgs);
+        assertEquals(result.get(20),mainJar.getRes());
+        assertEquals(result.get(21),mainArgs);
 
         //Others param without -yqu
         FlinkParameters param1 = new FlinkParameters();
diff --git 
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
 
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
index 1bf7cc1..21048ae 100644
--- 
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
+++ 
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
@@ -79,6 +79,18 @@
         </x-select>
       </div>
     </m-list-box>
+    <m-list-box v-if="deployMode === 'cluster'">
+      <div slot="text">{{$t('App Name')}}</div>
+      <div slot="content">
+        <x-input
+                :disabled="isDetails"
+                type="input"
+                v-model="appName"
+                :placeholder="$t('Please enter app name(optional)')"
+                autocomplete="off">
+        </x-input>
+      </div>
+    </m-list-box>
     <div class="list-box-4p" v-if="deployMode === 'cluster'">
       <div class="clearfix list">
         <span class="sp1" style="word-break:break-all">{{$t('JobManager 
Memory')}}</span>
@@ -129,6 +141,21 @@
         </span>
       </div>
     </div>
+    <div class="list-box-4p">
+      <div class="clearfix list">
+        <span class="sp1" 
style="word-break:break-all">{{$t('Parallelism')}}</span>
+        <span class="sp2">
+          <x-input
+                  :disabled="isDetails"
+                  type="input"
+                  v-model="parallelism"
+                  :placeholder="$t('Please enter Parallelism')"
+                  style="width: 200px;"
+                  autocomplete="off">
+          </x-input>
+        </span>
+      </div>
+    </div>
     <m-list-box>
       <div slot="text">{{$t('Main Arguments')}}</div>
       <div slot="content">
@@ -209,12 +236,16 @@
         localParams: [],
         // Slot number
         slot: 1,
+        // Parallelism
+        parallelism: 1,
         // TaskManager mumber
         taskManager: '2',
         // JobManager memory
         jobManagerMemory: '1G',
         // TaskManager memory
         taskManagerMemory: '2G',
+        // Flink app name
+        appName: '',
         // Main arguments
         mainArgs: '',
         // Option parameters
@@ -322,6 +353,11 @@
           return false
         }
 
+        if (!Number.isInteger(parseInt(this.parallelism))) {
+          this.$message.warning(`${i18n.$t('Please enter Parallelism')}`)
+          return false
+        }
+
         if (this.flinkVersion === '<1.10' && 
!Number.isInteger(parseInt(this.taskManager))) {
           this.$message.warning(`${i18n.$t('Please enter TaskManager 
number')}`)
           return false
@@ -351,9 +387,11 @@
           localParams: this.localParams,
           flinkVersion: this.flinkVersion,
           slot: this.slot,
+          parallelism: this.parallelism,
           taskManager: this.taskManager,
           jobManagerMemory: this.jobManagerMemory,
           taskManagerMemory: this.taskManagerMemory,
+          appName: this.appName,
           mainArgs: this.mainArgs,
           others: this.others,
           programType: this.programType
@@ -481,9 +519,11 @@
           localParams: this.localParams,
           flinkVersion: this.flinkVersion,
           slot: this.slot,
+          parallelism: this.parallelism,
           taskManager: this.taskManager,
           jobManagerMemory: this.jobManagerMemory,
           taskManagerMemory: this.taskManagerMemory,
+          appName: this.appName,
           mainArgs: this.mainArgs,
           others: this.others,
           programType: this.programType
@@ -522,10 +562,11 @@
           this.deployMode = o.params.deployMode || ''
           this.flinkVersion = o.params.flinkVersion || '<1.10'
           this.slot = o.params.slot || 1
+          this.parallelism = o.params.parallelism || 1
           this.taskManager = o.params.taskManager || '2'
           this.jobManagerMemory = o.params.jobManagerMemory || '1G'
           this.taskManagerMemory = o.params.taskManagerMemory || '2G'
-
+          this.appName = o.params.appName || ''
           this.mainArgs = o.params.mainArgs || ''
           this.others = o.params.others
           this.programType = o.params.programType || 'SCALA'
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js 
b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index 5c3de1f..5c983ab 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -118,8 +118,12 @@ export default {
   'Please enter TaskManager memory': 'Please enter TaskManager memory',
   'Slot Number': 'Slot Number',
   'Please enter Slot number': 'Please enter Slot number',
+  Parallelism: 'Parallelism',
+  'Please enter Parallelism': 'Please enter Parallelism',
   'TaskManager Number': 'TaskManager Number',
   'Please enter TaskManager number': 'Please enter TaskManager number',
+  'App Name': 'App Name',
+  'Please enter app name(optional)': 'Please enter app name(optional)',
   'SQL Type': 'SQL Type',
   Title: 'Title',
   'Please enter the title of email': 'Please enter the title of email',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js 
b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 3368438..88382b5 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -118,8 +118,12 @@ export default {
   'Please enter TaskManager memory': '请输入TaskManager内存数',
   'Slot Number': 'Slot数量',
   'Please enter Slot number': '请输入Slot数量',
+  Parallelism: '并行度',
+  'Please enter Parallelism': '请输入并行度',
   'TaskManager Number': 'TaskManager数量',
   'Please enter TaskManager number': '请输入TaskManager数量',
+  'App Name': '任务名称',
+  'Please enter app name(optional)': '请输入任务名称(选填)',
   'SQL Type': 'sql类型',
   Title: '主题',
   'Please enter the title of email': '请输入邮件主题',

Reply via email to