[FLINK-4139][yarn] adjust task slots to parallelism correctly - user specifies no parallelism -> parallelism is adjusted to #taskSlots * #nodes.
- user specifies parallelism but no #taskSlots or too few slots -> #taskSlots are set such that they meet the parallelism Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44b3bc45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44b3bc45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44b3bc45 Branch: refs/heads/master Commit: 44b3bc45b382c1f2783e9c17dd76ea2e9bbb40ec Parents: 96590ff Author: Maximilian Michels <[email protected]> Authored: Tue Jun 28 16:41:39 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jul 1 15:22:31 2016 +0200 ---------------------------------------------------------------------- .../CliFrontendAddressConfigurationTest.java | 6 +- ...CliFrontendYarnAddressConfigurationTest.java | 1 - .../flink/yarn/FlinkYarnSessionCliTest.java | 95 ++++++++++++++++++++ .../flink/yarn/cli/FlinkYarnSessionCli.java | 4 +- 4 files changed, 101 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java index c6b1111..0119dbe 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java @@ -78,10 +78,10 @@ public class CliFrontendAddressConfigurationTest { @Test(expected = IllegalConfigurationException.class) public void testInvalidConfigAndNoOption() throws Exception { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); - RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); + CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); + RunOptions options = CliFrontendParser.parseRunCommand(new String[] {}); - frontend.retrieveClient(options); + frontend.retrieveClient(options); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 24d8aa5..5c10de8 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -276,7 +276,6 @@ public class CliFrontendYarnAddressConfigurationTest { } - /////////// // Utils // /////////// http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index f71dd63..2d08bee 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -18,21 +18,34 @@ package org.apache.flink.yarn; +import akka.actor.ActorSystem; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.cli.RunOptions; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -73,4 +86,86 @@ public class FlinkYarnSessionCliTest { Assert.assertEquals(1, dynProperties.size()); Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout")); } + + @Test + public void testNotEnoughTaskSlots() throws Exception { + + File confFile = tmp.newFile("flink-conf.yaml"); + File jarFile = tmp.newFile("test.jar"); + new CliFrontend(tmp.getRoot().getAbsolutePath()); + + String[] params = + new String[] {"-yn", "2", "-ys", "3", "-p", "7", jarFile.getAbsolutePath()}; + + RunOptions runOptions = CliFrontendParser.parseRunCommand(params); + + FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn"); + + AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine()); + + // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. + Assert.assertEquals(4, descriptor.getTaskManagerSlots()); + Assert.assertEquals(2, descriptor.getTaskManagerCount()); + } + + @Test + public void testCorrectSettingOfMaxSlots() throws Exception { + + File confFile = tmp.newFile("flink-conf.yaml"); + File jarFile = tmp.newFile("test.jar"); + new CliFrontend(tmp.getRoot().getAbsolutePath()); + + String[] params = + new String[] {"-yn", "2", "-ys", "3", jarFile.getAbsolutePath()}; + + RunOptions runOptions = CliFrontendParser.parseRunCommand(params); + + FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn"); + + AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine()); + + // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. + Assert.assertEquals(3, descriptor.getTaskManagerSlots()); + Assert.assertEquals(2, descriptor.getTaskManagerCount()); + + Configuration config = new Configuration(); + CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("test", 9000)); + ClusterClient client = new TestingYarnClusterClient(descriptor, config); + Assert.assertEquals(6, client.getMaxSlots()); + } + + private static class TestCLI extends FlinkYarnSessionCli { + + public TestCLI(String shortPrefix, String longPrefix) { + super(shortPrefix, longPrefix); + } + + private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor { + @Override + public void setLocalJarPath(Path localJarPath) { +// setLocalJarPath("/tmp"); + } + } + + @Override + protected AbstractYarnClusterDescriptor getClusterDescriptor() { + return new JarAgnosticClusterDescriptor(); + } + } + + private static class TestingYarnClusterClient extends YarnClusterClient { + + public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws IOException, YarnException { + super(descriptor, + Mockito.mock(YarnClient.class), + Mockito.mock(ApplicationReport.class), + config, + new Path("/tmp"), true); + } + + @Override + protected ActorSystem createActorSystem() throws IOException { + return Mockito.mock(ActorSystem.class); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index d59657d..cf70406 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -30,6 +30,7 @@ import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; @@ -332,12 +333,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots(); if (yarnTmSlots == -1) { yarnTmSlots = 1; + yarnClusterDescriptor.setTaskManagerSlots(yarnTmSlots); } int maxSlots = yarnTmSlots * yarnClusterDescriptor.getTaskManagerCount(); int userParallelism = Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1")); if (userParallelism != -1) { - int slotsPerTM = userParallelism / yarnClusterDescriptor.getTaskManagerCount(); + int slotsPerTM = (int) Math.ceil((double) userParallelism / yarnClusterDescriptor.getTaskManagerCount()); String message = "The YARN cluster has " + maxSlots + " slots available, " + "but the user requested a parallelism of " + userParallelism + " on YARN. " + "Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +
