This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 614cc4fd Optimize flink run mode validate (#1745)
614cc4fd is described below
commit 614cc4fd0de352d866d840f1803eda2bd3611959
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Apr 27 15:10:19 2022 +0800
Optimize flink run mode validate (#1745)
---
.../apache/seatunnel/command/FlinkCommandArgs.java | 31 ++++++++++++++++--
.../FlinkRunMode.java} | 38 ++++++++--------------
.../java/org/apache/seatunnel/FlinkStarter.java | 13 +-------
3 files changed, 43 insertions(+), 39 deletions(-)
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index de02a524..16d3b34a 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -19,14 +19,17 @@ package org.apache.seatunnel.command;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.config.EngineType;
+import org.apache.seatunnel.config.FlinkRunMode;
+import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
public class FlinkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-r", "--run-mode"},
+ converter = RunModeConverter.class,
description = "job run mode, run or run-application")
- private String runMode = "run";
+ private FlinkRunMode runMode = FlinkRunMode.RUN;
@Override
public EngineType getEngineType() {
@@ -38,11 +41,33 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
return DeployMode.CLIENT;
}
- public String getRunMode() {
+ public FlinkRunMode getRunMode() {
return runMode;
}
- public void setRunMode(String runMode) {
+ public void setRunMode(FlinkRunMode runMode) {
this.runMode = runMode;
}
+
+ /**
+ * Used to convert the run mode string to the enum value.
+ */
+ private static class RunModeConverter implements
IStringConverter<FlinkRunMode> {
+ /**
+ * If the '-r' is not set, then will not go into this convert method.
+ *
+ * @param value input value set by '-r' or '--run-mode'
+ * @return flink run mode enum value
+ */
+ @Override
+ public FlinkRunMode convert(String value) {
+ for (FlinkRunMode flinkRunMode : FlinkRunMode.values()) {
+ if (flinkRunMode.getMode().equalsIgnoreCase(value)) {
+ return flinkRunMode;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Run mode %s not
supported", value));
+ }
+ }
+
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
similarity index 53%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
copy to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
index de02a524..41e46b45 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
@@ -15,34 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.config;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.EngineType;
-
-import com.beust.jcommander.Parameter;
-
-public class FlinkCommandArgs extends AbstractCommandArgs {
-
- @Parameter(names = {"-r", "--run-mode"},
- description = "job run mode, run or run-application")
- private String runMode = "run";
-
- @Override
- public EngineType getEngineType() {
- return EngineType.FLINK;
- }
+/**
+ * Flink run mode, used to determine whether to run in local or cluster mode.
+ * <a
href="https://flink.apache.org/news/2020/07/14/application-mode.html">application-mode</a>
+ */
+public enum FlinkRunMode {
+ RUN("run"),
+ APPLICATION_RUN("run-application"),
+ ;
- @Override
- public DeployMode getDeployMode() {
- return DeployMode.CLIENT;
- }
+ private final String mode;
- public String getRunMode() {
- return runMode;
+ FlinkRunMode(String mode) {
+ this.mode = mode;
}
- public void setRunMode(String runMode) {
- this.runMode = runMode;
+ public String getMode() {
+ return mode;
}
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
index 59355506..4f9ed83e 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
@@ -51,20 +51,9 @@ public class FlinkStarter implements Starter {
* SeaTunnel flink job jar.
*/
private final String appJar;
- private final String runMode;
FlinkStarter(String[] args) {
this.flinkCommandArgs = parseArgs(args);
-
- String mode = flinkCommandArgs.getRunMode();
- switch (mode) {
- case RUN_MODE_RUN:
- case RUN_MODE_APPLICATION:
- this.runMode = mode;
- break;
- default:
- throw new IllegalArgumentException("Run mode " + mode + " not
supported");
- }
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
@@ -105,7 +94,7 @@ public class FlinkStarter implements Starter {
public List<String> buildCommands() {
List<String> command = new ArrayList<>();
command.add("${FLINK_HOME}/bin/flink");
- command.add(runMode);
+ command.add(flinkCommandArgs.getRunMode().getMode());
command.addAll(flinkParams);
command.add("-c");
command.add(APP_NAME);