[FLINK-9028] [yarn] Perform parameters checking before Yarn starting cluster

This closes #5726.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38aa863d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38aa863d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38aa863d

Branch: refs/heads/master
Commit: 38aa863d5a710b283b5c9b2eb9225d6fb9cc0c70
Parents: 328f72d
Author: sihuazhou <[email protected]>
Authored: Tue Mar 20 15:59:33 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Wed Mar 21 10:56:54 2018 +0100

----------------------------------------------------------------------
 .../ContaineredTaskManagerParameters.java       | 41 ++++++++++++++------
 .../flink/runtime/dispatcher/Dispatcher.java    |  2 +-
 .../ContaineredTaskManagerParametersTest.java   | 32 +++++++++++++++
 .../taskexecutor/TaskManagerServicesTest.java   |  1 -
 .../yarn/AbstractYarnClusterDescriptor.java     | 27 +++++++++++++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  3 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  4 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   | 15 ++++---
 8 files changed, 103 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index c4dd486..fa7fdf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.clusterframework;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -101,46 +102,62 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
        // 
------------------------------------------------------------------------
        //  Factory
        // 
------------------------------------------------------------------------
-       
+
        /**
-        * Computes the parameters to be used to start a TaskManager Java 
process.
+        * calcuate cutoff memory size used by container, it will throw an 
{@link IllegalArgumentException}
+        * if the config is invalid or return the cutoff value if valid.
         *
         * @param config The Flink configuration.
         * @param containerMemoryMB The size of the complete container, in 
megabytes.
-        * @return The parameters to start the TaskManager processes with.
+        *
+        * @return cutoff memory size used by container.
         */
-       public static ContaineredTaskManagerParameters create(
-               Configuration config, long containerMemoryMB, int numSlots)
-       {
-               // (1) compute how much memory we subtract from the total 
memory, to get the Java memory
+       public static long calculateCutoffMB(Configuration config, long 
containerMemoryMB) {
+               Preconditions.checkArgument(containerMemoryMB > 0);
 
+               // (1) check cutoff ratio
                final float memoryCutoffRatio = config.getFloat(
                        ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
 
-               final int minCutoff = config.getInteger(
-                       ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
                if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
                        throw new IllegalArgumentException("The configuration 
value '"
                                + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be 
between 0 and 1. Value given="
                                + memoryCutoffRatio);
                }
 
+               // (2) check min cutoff value
+               final int minCutoff = config.getInteger(
+                       ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
                if (minCutoff >= containerMemoryMB) {
                        throw new IllegalArgumentException("The configuration 
value '"
                                + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
                                + "' is larger than the total container memory 
" + containerMemoryMB);
                }
 
+               // (3) check between heap and off-heap
                long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
                if (cutoff < minCutoff) {
                        cutoff = minCutoff;
                }
+               return cutoff;
+       }
 
-               final long javaMemorySizeMB = containerMemoryMB - cutoff;
+       /**
+        * Computes the parameters to be used to start a TaskManager Java 
process.
+        *
+        * @param config The Flink configuration.
+        * @param containerMemoryMB The size of the complete container, in 
megabytes.
+        * @return The parameters to start the TaskManager processes with.
+        */
+       public static ContaineredTaskManagerParameters create(
+               Configuration config, long containerMemoryMB, int numSlots)
+       {
+               // (1) try to compute how much memory used by container
+               final long cutoffMB = calculateCutoffMB(config, 
containerMemoryMB);
 
                // (2) split the remaining Java memory between heap and off-heap
-               final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
+               final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
                // use the cut-off memory for off-heap (that was its intention)
                final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 91a4f73..68b4046 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -313,7 +313,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        @Override
        public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
                if (jobManagerRunners.isEmpty()) {
-                       System.out.println("empty");
+                       log.info("empty");
                }
                return CompletableFuture.completedFuture(
                        Collections.unmodifiableSet(new 
HashSet<>(jobManagerRunners.keySet())));

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index 8d9ea88..230a934 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -27,6 +28,7 @@ import static 
org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
 import static 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ContaineredTaskManagerParametersTest extends TestLogger {
        private static final long CONTAINER_MEMORY = 8192L;
@@ -91,4 +93,34 @@ public class ContaineredTaskManagerParametersTest extends 
TestLogger {
                assertTrue(params.taskManagerHeapSizeMB() +
                        params.taskManagerDirectMemoryLimitMB() <= 
CONTAINER_MEMORY);
        }
+
+       /**
+        * Test to guard {@link 
ContaineredTaskManagerParameters#calculateCutoffMB(Configuration, long)}.
+        */
+       @Test
+       public void testCalculateCutoffMB() throws Exception {
+
+               Configuration config = new Configuration();
+               long containerMemoryMB = 1000;
+
+               
config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f);
+               
config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128);
+
+               assertEquals(128,
+                       
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB));
+
+               
config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.2f);
+               assertEquals(200,
+                       
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB));
+
+               
config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 1000);
+
+               try {
+                       
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB);
+               } catch (IllegalArgumentException expected) {
+                       // we expected it.
+                       return;
+               }
+               fail();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index b0c6c60..d3d5444 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -214,5 +214,4 @@ public class TaskManagerServicesTest extends TestLogger {
                config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
0.1f); // 10%
                assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, 
config));
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index bdb471a..eab5e39 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -36,9 +36,11 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -410,6 +412,28 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
        }
 
        /**
+        * Method to validate cluster specification before deploy it, it will 
throw
+        * an {@link IllegalConfigurationException} if the {@link 
ClusterSpecification} is invalid.
+        */
+       private void validateClusterSpecification(ClusterSpecification 
clusterSpecification) {
+               long taskManagerMemorySize = 
clusterSpecification.getTaskManagerMemoryMB();
+               long cutoff;
+               try {
+                       // We do the validation by calling the calculation 
methods here
+                       cutoff = 
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, 
taskManagerMemorySize);
+               } catch (IllegalArgumentException cutoffConfigurationInvalidEx) 
{
+                       throw new IllegalConfigurationException("Configurations 
related to cutoff checked failed.", cutoffConfigurationInvalidEx);
+               }
+
+               try {
+                       // We do the validation by calling the calculation 
methods here
+                       
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, 
flinkConfiguration);
+               } catch (IllegalArgumentException 
heapSizeConfigurationInvalidEx) {
+                       throw new IllegalConfigurationException("Configurations 
related to heap size checked failed.", heapSizeConfigurationInvalidEx);
+               }
+       }
+
+       /**
         * This method will block until the ApplicationMaster/JobManager have 
been deployed on YARN.
         *
         * @param clusterSpecification Initial cluster specification for the 
Flink cluster to be deployed
@@ -423,6 +447,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        @Nullable JobGraph jobGraph,
                        boolean detached) throws Exception {
 
+               // ------------------ Check if configuration is valid 
--------------------
+               validateClusterSpecification(clusterSpecification);
+
                if (UserGroupInformation.isSecurityEnabled()) {
                        // note: UGI::hasKerberosCredentials inaccurately 
reports false
                        // for logins based on a keytab (fixed in Hadoop 2.6.1, 
see HADOOP-10786),

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 2cdc19d..1443f99 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -631,7 +632,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                                if (detachedMode) {
                                        LOG.info("The Flink YARN client has 
been started in detached mode. In order to stop " +
                                                "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
-                                               "yarn application -kill " + 
applicationId.getOpt());
+                                               "yarn application -kill " + 
yarnApplicationId);
                                } else {
                                        ScheduledExecutorService 
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 20ce314..5b0d422 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -264,8 +264,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 7331);
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 
-               final int jobManagerMemory = 42;
-               final int taskManagerMemory = 41;
+               final int jobManagerMemory = 1337;
+               final int taskManagerMemory = 7331;
                final int slotsPerTaskManager = 30;
                final String[] args = {"-yjm", 
String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), 
"-ys", String.valueOf(slotsPerTaskManager)};
                final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(

http://git-wip-us.apache.org/repos/asf/flink/blob/38aa863d/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index dd8b625..52bf8bb 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -91,8 +92,11 @@ public class YarnClusterDescriptorTest extends TestLogger {
        @Test
        public void testFailIfTaskSlotsHigherThanMaxVcores() throws 
ClusterDeploymentException {
 
+               final Configuration flinkConfiguration = new Configuration();
+               
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN,
 0);
+
                YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
-                       new Configuration(),
+                       flinkConfiguration,
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
                        yarnClient,
@@ -101,8 +105,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
                clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
                ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(-1)
-                       .setTaskManagerMemoryMB(-1)
+                       .setMasterMemoryMB(1)
+                       .setTaskManagerMemoryMB(1)
                        .setNumberTaskManagers(1)
                        .setSlotsPerTaskManager(Integer.MAX_VALUE)
                        .createClusterSpecification();
@@ -126,6 +130,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                Configuration configuration = new Configuration();
                // overwrite vcores in config
                configuration.setInteger(YarnConfigOptions.VCORES, 
Integer.MAX_VALUE);
+               
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 
0);
 
                YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
                        configuration,
@@ -138,8 +143,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
                // configure slots
                ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(-1)
-                       .setTaskManagerMemoryMB(-1)
+                       .setMasterMemoryMB(1)
+                       .setTaskManagerMemoryMB(1)
                        .setNumberTaskManagers(1)
                        .setSlotsPerTaskManager(1)
                        .createClusterSpecification();

Reply via email to