This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9404443212331cfd32e5365255dd0430077da1e
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Tue Nov 5 12:07:29 2019 +0100

    [FLINK-XXXXX] Add the ExecutionOption option
---
 .../generated/deployment_configuration.html        | 26 ----------------
 .../generated/execution_configuration.html         |  5 ++++
 docs/ops/config.md                                 |  2 +-
 .../org/apache/flink/client/cli/DefaultCLI.java    | 10 +++++++
 .../apache/flink/configuration/ExecutionMode.java  | 35 ++++------------------
 .../flink/configuration/ExecutionOptions.java      |  7 +++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  4 +++
 7 files changed, 33 insertions(+), 56 deletions(-)

diff --git a/docs/_includes/generated/deployment_configuration.html 
b/docs/_includes/generated/deployment_configuration.html
deleted file mode 100644
index 7f7bdcc..0000000
--- a/docs/_includes/generated/deployment_configuration.html
+++ /dev/null
@@ -1,26 +0,0 @@
-<table class="table table-bordered">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 20%">Key</th>
-            <th class="text-left" style="width: 15%">Default</th>
-            <th class="text-left" style="width: 65%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td><h5>execution.attached</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Specifies if the pipeline is submitted in attached or detached 
mode.</td>
-        </tr>
-        <tr>
-            <td><h5>execution.shutdown-on-attached-exit</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>If the job is submitted in attached mode, perform a 
best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in 
response to a user interrupt, such as typing Ctrl + C.</td>
-        </tr>
-        <tr>
-            <td><h5>execution.target</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>The deployment target for the execution, e.g. "local" for 
local execution.</td>
-        </tr>
-    </tbody>
-</table>
diff --git a/docs/_includes/generated/execution_configuration.html 
b/docs/_includes/generated/execution_configuration.html
index 7f7bdcc..3ea92eb 100644
--- a/docs/_includes/generated/execution_configuration.html
+++ b/docs/_includes/generated/execution_configuration.html
@@ -22,5 +22,10 @@
             <td style="word-wrap: break-word;">(none)</td>
             <td>The deployment target for the execution, e.g. "local" for 
local execution.</td>
         </tr>
+        <tr>
+            <td><h5>execution.target.cluster-mode</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Indicates if a pipeline should be executed on an already 
existing cluster (=SESSION), or a new cluster should be spawned (=PER_JOB).</td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 9390dbd..b3e44e8 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -66,7 +66,7 @@ These parameters configure the default HDFS used by Flink. 
Setups that do not sp
 
 ### Execution
 
-{% include generated/deployment_configuration.html %}
+{% include generated/execution_configuration.html %}
 {% include generated/savepoint_config_configuration.html %}
 
 ### JobManager
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 397d5dd..12e3e79 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -20,6 +20,9 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.StandaloneClientFactory;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -48,4 +51,11 @@ public class DefaultCLI extends AbstractCustomCommandLine {
        public void addGeneralOptions(Options baseOptions) {
                super.addGeneralOptions(baseOptions);
        }
+
+       @Override
+       public Configuration applyCommandLineOptionsToConfiguration(CommandLine 
commandLine) throws FlinkException {
+               final Configuration configuration = 
super.applyCommandLineOptionsToConfiguration(commandLine);
+               configuration.set(ExecutionOptions.CLUSTER_MODE, 
ExecutionMode.SESSION);
+               return configuration;
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionMode.java
similarity index 50%
copy from 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
copy to 
flink-core/src/main/java/org/apache/flink/configuration/ExecutionMode.java
index 397d5dd..8390c73 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionMode.java
@@ -16,36 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.cli;
-
-import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
+package org.apache.flink.configuration;
 
 /**
- * The default CLI which is used for interaction with standalone clusters.
+ * Indicates if a pipeline is going to be executed on an already existing 
cluster ({@link ExecutionMode#SESSION}),
+ * or a new cluster should be spawned ({@link ExecutionMode#PER_JOB}).
  */
-public class DefaultCLI extends AbstractCustomCommandLine {
-
-       public DefaultCLI(Configuration configuration) {
-               super(configuration);
-       }
-
-       @Override
-       public boolean isActive(CommandLine commandLine) {
-               // always active because we can try to read a JobManager 
address from the config
-               return true;
-       }
-
-       @Override
-       public String getId() {
-               return StandaloneClientFactory.ID;
-       }
-
-       @Override
-       public void addGeneralOptions(Options baseOptions) {
-               super.addGeneralOptions(baseOptions);
-       }
+public enum ExecutionMode {
+       SESSION,
+       PER_JOB
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
index 7d8bcee..8e7f605 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
@@ -34,6 +34,13 @@ public class ExecutionOptions {
                                        .noDefaultValue()
                                        .withDescription("The deployment target 
for the execution, e.g. \"local\" for local execution.");
 
+       public static final ConfigOption<ExecutionMode> CLUSTER_MODE =
+                       key("execution.target.cluster-mode")
+                                       .enumType(ExecutionMode.class)
+                                       .noDefaultValue()
+                                       .withDescription("Indicates if a 
pipeline should be executed on an already existing cluster (=" + 
ExecutionMode.SESSION + ")" +
+                                                       ", or a new cluster 
should be spawned (=" + ExecutionMode.PER_JOB + ").");
+
        public static final ConfigOption<Boolean> ATTACHED =
                        key("execution.attached")
                                        .booleanType()
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4f1f762..90fcbfc 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -360,6 +361,9 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
 
                        effectiveConfiguration.setString(HA_CLUSTER_ID, 
zooKeeperNamespace);
                        
effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(applicationId));
+                       
effectiveConfiguration.set(ExecutionOptions.CLUSTER_MODE, 
ExecutionMode.SESSION);
+               } else {
+                       
effectiveConfiguration.set(ExecutionOptions.CLUSTER_MODE, 
ExecutionMode.PER_JOB);
                }
 
                if (commandLine.hasOption(jmMemory.getOpt())) {

Reply via email to