Repository: flink
Updated Branches:
  refs/heads/release-1.5 bfd8229d8 -> 27189d805


[FLINK-9028] [yarn] Improve failure message if cluster cannot be started


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6f91334
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6f91334
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6f91334

Branch: refs/heads/release-1.5
Commit: c6f91334b67d589f0c17ed75c9dbcbaedaf8ba51
Parents: fecc190
Author: Till Rohrmann <[email protected]>
Authored: Tue Mar 20 15:38:02 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Mar 21 10:58:39 2018 +0100

----------------------------------------------------------------------
 .../ContaineredTaskManagerParameters.java       | 28 ++++++++++----------
 .../ContaineredTaskManagerParametersTest.java   |  9 +++----
 .../taskexecutor/TaskManagerServicesTest.java   |  2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 26 +++++++++---------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  1 -
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  9 ++++---
 6 files changed, 37 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index fa7fdf4..a4e7d25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -32,23 +32,22 @@ import java.util.Map;
 public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
        private static final long serialVersionUID = -3096987654278064670L;
-       
-       /** Total container memory, in bytes */
+
+       /** Total container memory, in bytes. */
        private final long totalContainerMemoryMB;
 
-       /** Heap size to be used for the Java process */
+       /** Heap size to be used for the Java process. */
        private final long taskManagerHeapSizeMB;
 
-       /** Direct memory limit for the Java process */
+       /** Direct memory limit for the Java process. */
        private final long taskManagerDirectMemoryLimitMB;
 
-       /** The number of slots per TaskManager */
+       /** The number of slots per TaskManager. */
        private final int numSlots;
-       
-       /** Environment variables to add to the Java process */
+
+       /** Environment variables to add to the Java process. */
        private final HashMap<String, String> taskManagerEnv;
 
-       
        public ContaineredTaskManagerParameters(
                        long totalContainerMemoryMB,
                        long taskManagerHeapSizeMB,
@@ -62,7 +61,7 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
                this.numSlots = numSlots;
                this.taskManagerEnv = taskManagerEnv;
        }
-       
+
        // 
------------------------------------------------------------------------
 
        public long taskManagerTotalMemoryMB() {
@@ -87,7 +86,7 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
 
 
        // 
------------------------------------------------------------------------
-       
+
        @Override
        public String toString() {
                return "TaskManagerParameters {" +
@@ -104,7 +103,7 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
        // 
------------------------------------------------------------------------
 
        /**
-        * calcuate cutoff memory size used by container, it will throw an 
{@link IllegalArgumentException}
+        * Calcuate cutoff memory size used by container, it will throw an 
{@link IllegalArgumentException}
         * if the config is invalid or return the cutoff value if valid.
         *
         * @param config The Flink configuration.
@@ -151,8 +150,9 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
         * @return The parameters to start the TaskManager processes with.
         */
        public static ContaineredTaskManagerParameters create(
-               Configuration config, long containerMemoryMB, int numSlots)
-       {
+                       Configuration config,
+                       long containerMemoryMB,
+                       int numSlots) {
                // (1) try to compute how much memory used by container
                final long cutoffMB = calculateCutoffMB(config, 
containerMemoryMB);
 
@@ -164,7 +164,7 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
                // (3) obtain the additional environment variables from the 
configuration
                final HashMap<String, String> envVars = new HashMap<>();
                final String prefix = 
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
-               
+
                for (String key : config.keySet()) {
                        if (key.startsWith(prefix) && key.length() > 
prefix.length()) {
                                // remove prefix

http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index 230a934..8537d17 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -98,10 +98,10 @@ public class ContaineredTaskManagerParametersTest extends 
TestLogger {
         * Test to guard {@link 
ContaineredTaskManagerParameters#calculateCutoffMB(Configuration, long)}.
         */
        @Test
-       public void testCalculateCutoffMB() throws Exception {
+       public void testCalculateCutoffMB() {
 
                Configuration config = new Configuration();
-               long containerMemoryMB = 1000;
+               long containerMemoryMB = 1000L;
 
                
config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f);
                
config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128);
@@ -117,10 +117,9 @@ public class ContaineredTaskManagerParametersTest extends 
TestLogger {
 
                try {
                        
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB);
-               } catch (IllegalArgumentException expected) {
+                       fail("Expected to fail with an invalid argument 
exception.");
+               } catch (IllegalArgumentException ignored) {
                        // we expected it.
-                       return;
                }
-               fail();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index d3d5444..f6e7b07 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -44,7 +44,7 @@ public class TaskManagerServicesTest extends TestLogger {
         */
        @SuppressWarnings("deprecation")
        @Test
-       public void calculateNetworkBufOld() throws Exception {
+       public void calculateNetworkBufOld() {
                Configuration config = new Configuration();
                config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index eab5e39..caf7a76 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -413,23 +413,23 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        /**
         * Method to validate cluster specification before deploy it, it will 
throw
-        * an {@link IllegalConfigurationException} if the {@link 
ClusterSpecification} is invalid.
+        * an {@link FlinkException} if the {@link ClusterSpecification} is 
invalid.
+        *
+        * @param clusterSpecification cluster specification to check against 
the configuration of the
+        *                             AbstractYarnClusterDescriptor
+        * @throws FlinkException if the cluster cannot be started with the 
provided {@link ClusterSpecification}
         */
-       private void validateClusterSpecification(ClusterSpecification 
clusterSpecification) {
-               long taskManagerMemorySize = 
clusterSpecification.getTaskManagerMemoryMB();
-               long cutoff;
-               try {
-                       // We do the validation by calling the calculation 
methods here
-                       cutoff = 
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, 
taskManagerMemorySize);
-               } catch (IllegalArgumentException cutoffConfigurationInvalidEx) 
{
-                       throw new IllegalConfigurationException("Configurations 
related to cutoff checked failed.", cutoffConfigurationInvalidEx);
-               }
-
+       private void validateClusterSpecification(ClusterSpecification 
clusterSpecification) throws FlinkException {
                try {
+                       final long taskManagerMemorySize = 
clusterSpecification.getTaskManagerMemoryMB();
                        // We do the validation by calling the calculation 
methods here
+                       // Internally these methods will check whether the 
cluster can be started with the provided
+                       // ClusterSpecification and the configured memory 
requirements
+                       final long cutoff = 
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, 
taskManagerMemorySize);
                        
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, 
flinkConfiguration);
-               } catch (IllegalArgumentException 
heapSizeConfigurationInvalidEx) {
-                       throw new IllegalConfigurationException("Configurations 
related to heap size checked failed.", heapSizeConfigurationInvalidEx);
+               } catch (IllegalArgumentException iae) {
+                       throw new FlinkException("Cannot fulfill the minimum 
memory requirements with the provided " +
+                               "cluster specification. Please increase the 
memory of the cluster.", iae);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 1443f99..2311e87 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 5b0d422..62110ed 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -260,13 +260,14 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        @Test
        public void testCommandLineClusterSpecification() throws Exception {
                final Configuration configuration = new Configuration();
-               
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 1337);
-               
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 7331);
-               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
-
                final int jobManagerMemory = 1337;
                final int taskManagerMemory = 7331;
                final int slotsPerTaskManager = 30;
+
+               
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
jobManagerMemory);
+               
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
taskManagerMemory);
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
+
                final String[] args = {"-yjm", 
String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), 
"-ys", String.valueOf(slotsPerTaskManager)};
                final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
                        configuration,

Reply via email to