Repository: flink Updated Branches: refs/heads/release-1.5 bfd8229d8 -> 27189d805
[FLINK-9028] [yarn] Improve failure message if cluster cannot be started Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6f91334 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6f91334 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6f91334 Branch: refs/heads/release-1.5 Commit: c6f91334b67d589f0c17ed75c9dbcbaedaf8ba51 Parents: fecc190 Author: Till Rohrmann <[email protected]> Authored: Tue Mar 20 15:38:02 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Wed Mar 21 10:58:39 2018 +0100 ---------------------------------------------------------------------- .../ContaineredTaskManagerParameters.java | 28 ++++++++++---------- .../ContaineredTaskManagerParametersTest.java | 9 +++---- .../taskexecutor/TaskManagerServicesTest.java | 2 +- .../yarn/AbstractYarnClusterDescriptor.java | 26 +++++++++--------- .../flink/yarn/cli/FlinkYarnSessionCli.java | 1 - .../flink/yarn/FlinkYarnSessionCliTest.java | 9 ++++--- 6 files changed, 37 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index fa7fdf4..a4e7d25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -32,23 +32,22 @@ import java.util.Map; public class ContaineredTaskManagerParameters implements java.io.Serializable { private static final long serialVersionUID = -3096987654278064670L; - - /** Total container memory, in bytes */ + + /** Total container memory, in bytes. */ private final long totalContainerMemoryMB; - /** Heap size to be used for the Java process */ + /** Heap size to be used for the Java process. */ private final long taskManagerHeapSizeMB; - /** Direct memory limit for the Java process */ + /** Direct memory limit for the Java process. */ private final long taskManagerDirectMemoryLimitMB; - /** The number of slots per TaskManager */ + /** The number of slots per TaskManager. */ private final int numSlots; - - /** Environment variables to add to the Java process */ + + /** Environment variables to add to the Java process. */ private final HashMap<String, String> taskManagerEnv; - public ContaineredTaskManagerParameters( long totalContainerMemoryMB, long taskManagerHeapSizeMB, @@ -62,7 +61,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { this.numSlots = numSlots; this.taskManagerEnv = taskManagerEnv; } - + // ------------------------------------------------------------------------ public long taskManagerTotalMemoryMB() { @@ -87,7 +86,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // ------------------------------------------------------------------------ - + @Override public String toString() { return "TaskManagerParameters {" + @@ -104,7 +103,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // ------------------------------------------------------------------------ /** - * calcuate cutoff memory size used by container, it will throw an {@link IllegalArgumentException} + * Calcuate cutoff memory size used by container, it will throw an {@link IllegalArgumentException} * if the config is invalid or return the cutoff value if valid. * * @param config The Flink configuration. @@ -151,8 +150,9 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { * @return The parameters to start the TaskManager processes with. */ public static ContaineredTaskManagerParameters create( - Configuration config, long containerMemoryMB, int numSlots) - { + Configuration config, + long containerMemoryMB, + int numSlots) { // (1) try to compute how much memory used by container final long cutoffMB = calculateCutoffMB(config, containerMemoryMB); @@ -164,7 +164,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (3) obtain the additional environment variables from the configuration final HashMap<String, String> envVars = new HashMap<>(); final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; - + for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { // remove prefix http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java index 230a934..8537d17 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java @@ -98,10 +98,10 @@ public class ContaineredTaskManagerParametersTest extends TestLogger { * Test to guard {@link ContaineredTaskManagerParameters#calculateCutoffMB(Configuration, long)}. */ @Test - public void testCalculateCutoffMB() throws Exception { + public void testCalculateCutoffMB() { Configuration config = new Configuration(); - long containerMemoryMB = 1000; + long containerMemoryMB = 1000L; config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f); config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128); @@ -117,10 +117,9 @@ public class ContaineredTaskManagerParametersTest extends TestLogger { try { ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB); - } catch (IllegalArgumentException expected) { + fail("Expected to fail with an invalid argument exception."); + } catch (IllegalArgumentException ignored) { // we expected it. - return; } - fail(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java index d3d5444..f6e7b07 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java @@ -44,7 +44,7 @@ public class TaskManagerServicesTest extends TestLogger { */ @SuppressWarnings("deprecation") @Test - public void calculateNetworkBufOld() throws Exception { + public void calculateNetworkBufOld() { Configuration config = new Configuration(); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index eab5e39..caf7a76 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -413,23 +413,23 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor /** * Method to validate cluster specification before deploy it, it will throw - * an {@link IllegalConfigurationException} if the {@link ClusterSpecification} is invalid. + * an {@link FlinkException} if the {@link ClusterSpecification} is invalid. + * + * @param clusterSpecification cluster specification to check against the configuration of the + * AbstractYarnClusterDescriptor + * @throws FlinkException if the cluster cannot be started with the provided {@link ClusterSpecification} */ - private void validateClusterSpecification(ClusterSpecification clusterSpecification) { - long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB(); - long cutoff; - try { - // We do the validation by calling the calculation methods here - cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize); - } catch (IllegalArgumentException cutoffConfigurationInvalidEx) { - throw new IllegalConfigurationException("Configurations related to cutoff checked failed.", cutoffConfigurationInvalidEx); - } - + private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException { try { + final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB(); // We do the validation by calling the calculation methods here + // Internally these methods will check whether the cluster can be started with the provided + // ClusterSpecification and the configured memory requirements + final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize); TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration); - } catch (IllegalArgumentException heapSizeConfigurationInvalidEx) { - throw new IllegalConfigurationException("Configurations related to heap size checked failed.", heapSizeConfigurationInvalidEx); + } catch (IllegalArgumentException iae) { + throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " + + "cluster specification. Please increase the memory of the cluster.", iae); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 1443f99..2311e87 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -31,7 +31,6 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.security.SecurityConfiguration; http://git-wip-us.apache.org/repos/asf/flink/blob/c6f91334/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 5b0d422..62110ed 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -260,13 +260,14 @@ public class FlinkYarnSessionCliTest extends TestLogger { @Test public void testCommandLineClusterSpecification() throws Exception { final Configuration configuration = new Configuration(); - configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 1337); - configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 7331); - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); - final int jobManagerMemory = 1337; final int taskManagerMemory = 7331; final int slotsPerTaskManager = 30; + + configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory); + configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager); + final String[] args = {"-yjm", String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), "-ys", String.valueOf(slotsPerTaskManager)}; final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( configuration,
