This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9404443212331cfd32e5365255dd0430077da1e Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Tue Nov 5 12:07:29 2019 +0100 [FLINK-XXXXX] Add the ExecutionOption option --- .../generated/deployment_configuration.html | 26 ---------------- .../generated/execution_configuration.html | 5 ++++ docs/ops/config.md | 2 +- .../org/apache/flink/client/cli/DefaultCLI.java | 10 +++++++ .../apache/flink/configuration/ExecutionMode.java | 35 ++++------------------ .../flink/configuration/ExecutionOptions.java | 7 +++++ .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 4 +++ 7 files changed, 33 insertions(+), 56 deletions(-) diff --git a/docs/_includes/generated/deployment_configuration.html b/docs/_includes/generated/deployment_configuration.html deleted file mode 100644 index 7f7bdcc..0000000 --- a/docs/_includes/generated/deployment_configuration.html +++ /dev/null @@ -1,26 +0,0 @@ -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Key</th> - <th class="text-left" style="width: 15%">Default</th> - <th class="text-left" style="width: 65%">Description</th> - </tr> - </thead> - <tbody> - <tr> - <td><h5>execution.attached</h5></td> - <td style="word-wrap: break-word;">false</td> - <td>Specifies if the pipeline is submitted in attached or detached mode.</td> - </tr> - <tr> - <td><h5>execution.shutdown-on-attached-exit</h5></td> - <td style="word-wrap: break-word;">false</td> - <td>If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.</td> - </tr> - <tr> - <td><h5>execution.target</h5></td> - <td style="word-wrap: break-word;">(none)</td> - <td>The deployment target for the execution, e.g. "local" for local execution.</td> - </tr> - </tbody> -</table> diff --git a/docs/_includes/generated/execution_configuration.html b/docs/_includes/generated/execution_configuration.html index 7f7bdcc..3ea92eb 100644 --- a/docs/_includes/generated/execution_configuration.html +++ b/docs/_includes/generated/execution_configuration.html @@ -22,5 +22,10 @@ <td style="word-wrap: break-word;">(none)</td> <td>The deployment target for the execution, e.g. "local" for local execution.</td> </tr> + <tr> + <td><h5>execution.target.cluster-mode</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>Indicates if a pipeline should be executed on an already existing cluster (=SESSION), or a new cluster should be spawned (=PER_JOB).</td> + </tr> </tbody> </table> diff --git a/docs/ops/config.md b/docs/ops/config.md index 9390dbd..b3e44e8 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -66,7 +66,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp ### Execution -{% include generated/deployment_configuration.html %} +{% include generated/execution_configuration.html %} {% include generated/savepoint_config_configuration.html %} ### JobManager diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 397d5dd..12e3e79 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -20,6 +20,9 @@ package org.apache.flink.client.cli; import org.apache.flink.client.deployment.StandaloneClientFactory; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.util.FlinkException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; @@ -48,4 +51,11 @@ public class DefaultCLI extends AbstractCustomCommandLine { public void addGeneralOptions(Options baseOptions) { super.addGeneralOptions(baseOptions); } + + @Override + public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException { + final Configuration configuration = super.applyCommandLineOptionsToConfiguration(commandLine); + configuration.set(ExecutionOptions.CLUSTER_MODE, ExecutionMode.SESSION); + return configuration; + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionMode.java similarity index 50% copy from flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java copy to flink-core/src/main/java/org/apache/flink/configuration/ExecutionMode.java index 397d5dd..8390c73 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionMode.java @@ -16,36 +16,13 @@ * limitations under the License. */ -package org.apache.flink.client.cli; - -import org.apache.flink.client.deployment.StandaloneClientFactory; -import org.apache.flink.configuration.Configuration; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; +package org.apache.flink.configuration; /** - * The default CLI which is used for interaction with standalone clusters. + * Indicates if a pipeline is going to be executed on an already existing cluster ({@link ExecutionMode#SESSION}), + * or a new cluster should be spawned ({@link ExecutionMode#PER_JOB}). */ -public class DefaultCLI extends AbstractCustomCommandLine { - - public DefaultCLI(Configuration configuration) { - super(configuration); - } - - @Override - public boolean isActive(CommandLine commandLine) { - // always active because we can try to read a JobManager address from the config - return true; - } - - @Override - public String getId() { - return StandaloneClientFactory.ID; - } - - @Override - public void addGeneralOptions(Options baseOptions) { - super.addGeneralOptions(baseOptions); - } +public enum ExecutionMode { + SESSION, + PER_JOB } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java index 7d8bcee..8e7f605 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java @@ -34,6 +34,13 @@ public class ExecutionOptions { .noDefaultValue() .withDescription("The deployment target for the execution, e.g. \"local\" for local execution."); + public static final ConfigOption<ExecutionMode> CLUSTER_MODE = + key("execution.target.cluster-mode") + .enumType(ExecutionMode.class) + .noDefaultValue() + .withDescription("Indicates if a pipeline should be executed on an already existing cluster (=" + ExecutionMode.SESSION + ")" + + ", or a new cluster should be spawned (=" + ExecutionMode.PER_JOB + ")."); + public static final ConfigOption<Boolean> ATTACHED = key("execution.attached") .booleanType() diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 4f1f762..90fcbfc 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -29,6 +29,7 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.ExecutionMode; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; @@ -360,6 +361,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace); effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId)); + effectiveConfiguration.set(ExecutionOptions.CLUSTER_MODE, ExecutionMode.SESSION); + } else { + effectiveConfiguration.set(ExecutionOptions.CLUSTER_MODE, ExecutionMode.PER_JOB); } if (commandLine.hasOption(jmMemory.getOpt())) {