This is an automated email from the ASF dual-hosted git repository.

uce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 882efcb  [FLINK-11534] [container] Allow ExecutionMode configuration
882efcb is described below

commit 882efcbdfd90ba4ccfc2f8418bf9d31850d1ca5e
Author: Ufuk Celebi <[email protected]>
AuthorDate: Tue Apr 16 16:38:16 2019 +0200

    [FLINK-11534] [container] Allow ExecutionMode configuration
---
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 16 ++++-
 .../StandaloneJobClusterEntryPointTest.java        | 69 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 651a874..c69325d 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -92,8 +93,7 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
                }
 
                Configuration configuration = 
loadConfiguration(clusterConfiguration);
-
-               configuration.setString(ClusterEntrypoint.EXECUTION_MODE, 
ExecutionMode.DETACHED.toString());
+               setDefaultExecutionModeIfNotConfigured(configuration);
 
                StandaloneJobClusterEntryPoint entrypoint = new 
StandaloneJobClusterEntryPoint(
                        configuration,
@@ -104,4 +104,16 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
 
                ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        }
+
+       @VisibleForTesting
+       static void setDefaultExecutionModeIfNotConfigured(Configuration 
configuration) {
+               if (isNoExecutionModeConfigured(configuration)) {
+                       // In contrast to other places, the default for 
standalone job clusters is ExecutionMode.DETACHED
+                       
configuration.setString(ClusterEntrypoint.EXECUTION_MODE, 
ExecutionMode.DETACHED.toString());
+               }
+       }
+
+       private static boolean isNoExecutionModeConfigured(Configuration 
configuration) {
+               return 
configuration.getString(ClusterEntrypoint.EXECUTION_MODE, null) == null;
+       }
 }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
new file mode 100644
index 0000000..61d37ac
--- /dev/null
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint.ExecutionMode;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ */
+public class StandaloneJobClusterEntryPointTest extends TestLogger {
+
+       /**
+        * Tests that the default {@link ExecutionMode} is {@link 
ExecutionMode#DETACHED}.
+        */
+       @Test
+       public void testDefaultExecutionModeIsDetached() {
+               Configuration configuration = new Configuration();
+
+               
StandaloneJobClusterEntryPoint.setDefaultExecutionModeIfNotConfigured(configuration);
+
+               assertThat(getExecutionMode(configuration), 
equalTo(ExecutionMode.DETACHED));
+       }
+
+       /**
+        * Tests that {@link ExecutionMode} is not overwritten if provided.
+        */
+       @Test
+       public void testDontOverwriteExecutionMode() {
+               Configuration configuration = new Configuration();
+               setExecutionMode(configuration, ExecutionMode.NORMAL);
+
+               
StandaloneJobClusterEntryPoint.setDefaultExecutionModeIfNotConfigured(configuration);
+
+               // Don't overwrite provided configuration
+               assertThat(getExecutionMode(configuration), 
equalTo(ExecutionMode.NORMAL));
+       }
+
+       private static void setExecutionMode(Configuration configuration, 
ExecutionMode executionMode) {
+               configuration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
+       }
+
+       private static ExecutionMode getExecutionMode(Configuration 
configuration) {
+               return 
ExecutionMode.valueOf(configuration.getString(ClusterEntrypoint.EXECUTION_MODE));
+       }
+}

Reply via email to