This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 614cc4fd Optimize flink run mode validate (#1745)
614cc4fd is described below

commit 614cc4fd0de352d866d840f1803eda2bd3611959
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Apr 27 15:10:19 2022 +0800

    Optimize flink run mode validate (#1745)
---
 .../apache/seatunnel/command/FlinkCommandArgs.java | 31 ++++++++++++++++--
 .../FlinkRunMode.java}                             | 38 ++++++++--------------
 .../java/org/apache/seatunnel/FlinkStarter.java    | 13 +-------
 3 files changed, 43 insertions(+), 39 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index de02a524..16d3b34a 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -19,14 +19,17 @@ package org.apache.seatunnel.command;
 
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.config.EngineType;
+import org.apache.seatunnel.config.FlinkRunMode;
 
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-r", "--run-mode"},
+        converter = RunModeConverter.class,
         description = "job run mode, run or run-application")
-    private String runMode = "run";
+    private FlinkRunMode runMode = FlinkRunMode.RUN;
 
     @Override
     public EngineType getEngineType() {
@@ -38,11 +41,33 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         return DeployMode.CLIENT;
     }
 
-    public String getRunMode() {
+    public FlinkRunMode getRunMode() {
         return runMode;
     }
 
-    public void setRunMode(String runMode) {
+    public void setRunMode(FlinkRunMode runMode) {
         this.runMode = runMode;
     }
+
+    /**
+     * Used to convert the run mode string to the enum value.
+     */
+    private static class RunModeConverter implements 
IStringConverter<FlinkRunMode> {
+        /**
+         * If the '-r' is not set, then will not go into this convert method.
+         *
+         * @param value input value set by '-r' or '--run-mode'
+         * @return flink run mode enum value
+         */
+        @Override
+        public FlinkRunMode convert(String value) {
+            for (FlinkRunMode flinkRunMode : FlinkRunMode.values()) {
+                if (flinkRunMode.getMode().equalsIgnoreCase(value)) {
+                    return flinkRunMode;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Run mode %s not 
supported", value));
+        }
+    }
+
 }
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
similarity index 53%
copy from 
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
copy to 
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
index de02a524..41e46b45 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
@@ -15,34 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.config;
 
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.EngineType;
-
-import com.beust.jcommander.Parameter;
-
-public class FlinkCommandArgs extends AbstractCommandArgs {
-
-    @Parameter(names = {"-r", "--run-mode"},
-        description = "job run mode, run or run-application")
-    private String runMode = "run";
-
-    @Override
-    public EngineType getEngineType() {
-        return EngineType.FLINK;
-    }
+/**
+ * Flink run mode, used to determine whether to run in local or cluster mode.
+ * <a 
href="https://flink.apache.org/news/2020/07/14/application-mode.html";>application-mode</a>
+ */
+public enum FlinkRunMode {
+    RUN("run"),
+    APPLICATION_RUN("run-application"),
+    ;
 
-    @Override
-    public DeployMode getDeployMode() {
-        return DeployMode.CLIENT;
-    }
+    private final String mode;
 
-    public String getRunMode() {
-        return runMode;
+    FlinkRunMode(String mode) {
+        this.mode = mode;
     }
 
-    public void setRunMode(String runMode) {
-        this.runMode = runMode;
+    public String getMode() {
+        return mode;
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
index 59355506..4f9ed83e 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
@@ -51,20 +51,9 @@ public class FlinkStarter implements Starter {
      * SeaTunnel flink job jar.
      */
     private final String appJar;
-    private final String runMode;
 
     FlinkStarter(String[] args) {
         this.flinkCommandArgs = parseArgs(args);
-
-        String mode = flinkCommandArgs.getRunMode();
-        switch (mode) {
-            case RUN_MODE_RUN:
-            case RUN_MODE_APPLICATION:
-                this.runMode = mode;
-                break;
-            default:
-                throw new IllegalArgumentException("Run mode " + mode + " not 
supported");
-        }
         // set the deployment mode, used to get the job jar path.
         Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
         this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
@@ -105,7 +94,7 @@ public class FlinkStarter implements Starter {
     public List<String> buildCommands() {
         List<String> command = new ArrayList<>();
         command.add("${FLINK_HOME}/bin/flink");
-        command.add(runMode);
+        command.add(flinkCommandArgs.getRunMode().getMode());
         command.addAll(flinkParams);
         command.add("-c");
         command.add(APP_NAME);

Reply via email to