[FLINK-9028] [yarn] Perform parameters checking before Yarn starting cluster
This closes #5726. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fecc1908 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fecc1908 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fecc1908 Branch: refs/heads/release-1.5 Commit: fecc19088b36fc4c8bca5ff39ba756f8fd111171 Parents: bfd8229 Author: sihuazhou <[email protected]> Authored: Tue Mar 20 15:59:33 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Wed Mar 21 10:58:39 2018 +0100 ---------------------------------------------------------------------- .../ContaineredTaskManagerParameters.java | 41 ++++++++++++++------ .../flink/runtime/dispatcher/Dispatcher.java | 2 +- .../ContaineredTaskManagerParametersTest.java | 32 +++++++++++++++ .../taskexecutor/TaskManagerServicesTest.java | 1 - .../yarn/AbstractYarnClusterDescriptor.java | 27 +++++++++++++ .../flink/yarn/cli/FlinkYarnSessionCli.java | 3 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 4 +- .../flink/yarn/YarnClusterDescriptorTest.java | 15 ++++--- 8 files changed, 103 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index c4dd486..fa7fdf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.Preconditions; import java.util.HashMap; import java.util.Map; @@ -101,46 +102,62 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // ------------------------------------------------------------------------ // Factory // ------------------------------------------------------------------------ - + /** - * Computes the parameters to be used to start a TaskManager Java process. + * calcuate cutoff memory size used by container, it will throw an {@link IllegalArgumentException} + * if the config is invalid or return the cutoff value if valid. * * @param config The Flink configuration. * @param containerMemoryMB The size of the complete container, in megabytes. - * @return The parameters to start the TaskManager processes with. + * + * @return cutoff memory size used by container. */ - public static ContaineredTaskManagerParameters create( - Configuration config, long containerMemoryMB, int numSlots) - { - // (1) compute how much memory we subtract from the total memory, to get the Java memory + public static long calculateCutoffMB(Configuration config, long containerMemoryMB) { + Preconditions.checkArgument(containerMemoryMB > 0); + // (1) check cutoff ratio final float memoryCutoffRatio = config.getFloat( ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO); - final int minCutoff = config.getInteger( - ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN); - if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) { throw new IllegalArgumentException("The configuration value '" + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } + // (2) check min cutoff value + final int minCutoff = config.getInteger( + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN); + if (minCutoff >= containerMemoryMB) { throw new IllegalArgumentException("The configuration value '" + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff + "' is larger than the total container memory " + containerMemoryMB); } + // (3) check between heap and off-heap long cutoff = (long) (containerMemoryMB * memoryCutoffRatio); if (cutoff < minCutoff) { cutoff = minCutoff; } + return cutoff; + } - final long javaMemorySizeMB = containerMemoryMB - cutoff; + /** + * Computes the parameters to be used to start a TaskManager Java process. + * + * @param config The Flink configuration. + * @param containerMemoryMB The size of the complete container, in megabytes. + * @return The parameters to start the TaskManager processes with. + */ + public static ContaineredTaskManagerParameters create( + Configuration config, long containerMemoryMB, int numSlots) + { + // (1) try to compute how much memory used by container + final long cutoffMB = calculateCutoffMB(config, containerMemoryMB); // (2) split the remaining Java memory between heap and off-heap - final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config); + final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config); // use the cut-off memory for off-heap (that was its intention) final long offHeapSizeMB = containerMemoryMB - heapSizeMB; http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 91a4f73..68b4046 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -313,7 +313,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme @Override public CompletableFuture<Collection<JobID>> listJobs(Time timeout) { if (jobManagerRunners.isEmpty()) { - System.out.println("empty"); + log.info("empty"); } return CompletableFuture.completedFuture( Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet()))); http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java index 8d9ea88..230a934 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -27,6 +28,7 @@ import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP; import static org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ContaineredTaskManagerParametersTest extends TestLogger { private static final long CONTAINER_MEMORY = 8192L; @@ -91,4 +93,34 @@ public class ContaineredTaskManagerParametersTest extends TestLogger { assertTrue(params.taskManagerHeapSizeMB() + params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY); } + + /** + * Test to guard {@link ContaineredTaskManagerParameters#calculateCutoffMB(Configuration, long)}. + */ + @Test + public void testCalculateCutoffMB() throws Exception { + + Configuration config = new Configuration(); + long containerMemoryMB = 1000; + + config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f); + config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128); + + assertEquals(128, + ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB)); + + config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.2f); + assertEquals(200, + ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB)); + + config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 1000); + + try { + ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB); + } catch (IllegalArgumentException expected) { + // we expected it. + return; + } + fail(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java index b0c6c60..d3d5444 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java @@ -214,5 +214,4 @@ public class TaskManagerServicesTest extends TestLogger { config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10% assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config)); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index bdb471a..eab5e39 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -36,9 +36,11 @@ import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; @@ -410,6 +412,28 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } /** + * Method to validate cluster specification before deploy it, it will throw + * an {@link IllegalConfigurationException} if the {@link ClusterSpecification} is invalid. + */ + private void validateClusterSpecification(ClusterSpecification clusterSpecification) { + long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB(); + long cutoff; + try { + // We do the validation by calling the calculation methods here + cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize); + } catch (IllegalArgumentException cutoffConfigurationInvalidEx) { + throw new IllegalConfigurationException("Configurations related to cutoff checked failed.", cutoffConfigurationInvalidEx); + } + + try { + // We do the validation by calling the calculation methods here + TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration); + } catch (IllegalArgumentException heapSizeConfigurationInvalidEx) { + throw new IllegalConfigurationException("Configurations related to heap size checked failed.", heapSizeConfigurationInvalidEx); + } + } + + /** * This method will block until the ApplicationMaster/JobManager have been deployed on YARN. * * @param clusterSpecification Initial cluster specification for the Flink cluster to be deployed @@ -423,6 +447,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor @Nullable JobGraph jobGraph, boolean detached) throws Exception { + // ------------------ Check if configuration is valid -------------------- + validateClusterSpecification(clusterSpecification); + if (UserGroupInformation.isSecurityEnabled()) { // note: UGI::hasKerberosCredentials inaccurately reports false // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 2cdc19d..1443f99 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.security.SecurityConfiguration; @@ -631,7 +632,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId if (detachedMode) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + applicationId.getOpt()); + "yarn application -kill " + yarnApplicationId); } else { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 20ce314..5b0d422 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -264,8 +264,8 @@ public class FlinkYarnSessionCliTest extends TestLogger { configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 7331); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); - final int jobManagerMemory = 42; - final int taskManagerMemory = 41; + final int jobManagerMemory = 1337; + final int taskManagerMemory = 7331; final int slotsPerTaskManager = 30; final String[] args = {"-yjm", String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), "-ys", String.valueOf(slotsPerTaskManager)}; final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( http://git-wip-us.apache.org/repos/asf/flink/blob/fecc1908/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index dd8b625..52bf8bb 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; @@ -91,8 +92,11 @@ public class YarnClusterDescriptorTest extends TestLogger { @Test public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException { + final Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0); + YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( - new Configuration(), + flinkConfiguration, yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), yarnClient, @@ -101,8 +105,8 @@ public class YarnClusterDescriptorTest extends TestLogger { clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath())); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(-1) - .setTaskManagerMemoryMB(-1) + .setMasterMemoryMB(1) + .setTaskManagerMemoryMB(1) .setNumberTaskManagers(1) .setSlotsPerTaskManager(Integer.MAX_VALUE) .createClusterSpecification(); @@ -126,6 +130,7 @@ public class YarnClusterDescriptorTest extends TestLogger { Configuration configuration = new Configuration(); // overwrite vcores in config configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE); + configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0); YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( configuration, @@ -138,8 +143,8 @@ public class YarnClusterDescriptorTest extends TestLogger { // configure slots ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(-1) - .setTaskManagerMemoryMB(-1) + .setMasterMemoryMB(1) + .setTaskManagerMemoryMB(1) .setNumberTaskManagers(1) .setSlotsPerTaskManager(1) .createClusterSpecification();
