[FLINK-4139][yarn] adjust task slots to parallelism correctly

- user specifies no parallelism
  -> parallelism is adjusted to #taskSlots * #nodes.

- user specifies parallelism but no #taskSlots or too few slots
  -> #taskSlots are set such that they meet the parallelism


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44b3bc45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44b3bc45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44b3bc45

Branch: refs/heads/master
Commit: 44b3bc45b382c1f2783e9c17dd76ea2e9bbb40ec
Parents: 96590ff
Author: Maximilian Michels <[email protected]>
Authored: Tue Jun 28 16:41:39 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Jul 1 15:22:31 2016 +0200

----------------------------------------------------------------------
 .../CliFrontendAddressConfigurationTest.java    |  6 +-
 ...CliFrontendYarnAddressConfigurationTest.java |  1 -
 .../flink/yarn/FlinkYarnSessionCliTest.java     | 95 ++++++++++++++++++++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  4 +-
 4 files changed, 101 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index c6b1111..0119dbe 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -78,10 +78,10 @@ public class CliFrontendAddressConfigurationTest {
 
        @Test(expected = IllegalConfigurationException.class)
        public void testInvalidConfigAndNoOption() throws Exception {
-                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-                       RunOptions options = 
CliFrontendParser.parseRunCommand(new String[] {});
+               CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
+               RunOptions options = CliFrontendParser.parseRunCommand(new 
String[] {});
 
-                       frontend.retrieveClient(options);
+               frontend.retrieveClient(options);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 24d8aa5..5c10de8 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -276,7 +276,6 @@ public class CliFrontendYarnAddressConfigurationTest {
 
        }
 
-
        ///////////
        // Utils //
        ///////////

http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index f71dd63..2d08bee 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -18,21 +18,34 @@
 
 package org.apache.flink.yarn;
 
+import akka.actor.ActorSystem;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -73,4 +86,86 @@ public class FlinkYarnSessionCliTest {
                Assert.assertEquals(1, dynProperties.size());
                Assert.assertEquals("5 min", 
dynProperties.get("akka.ask.timeout"));
        }
+
+       @Test
+       public void testNotEnoughTaskSlots() throws Exception {
+
+               File confFile = tmp.newFile("flink-conf.yaml");
+               File jarFile = tmp.newFile("test.jar");
+               new CliFrontend(tmp.getRoot().getAbsolutePath());
+
+               String[] params =
+                       new String[] {"-yn", "2", "-ys", "3", "-p", "7", 
jarFile.getAbsolutePath()};
+
+               RunOptions runOptions = 
CliFrontendParser.parseRunCommand(params);
+
+               FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
+
+               AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createDescriptor("", runOptions.getCommandLine());
+
+               // each task manager has 3 slots but the parallelism is 7. Thus 
the slots should be increased.
+               Assert.assertEquals(4, descriptor.getTaskManagerSlots());
+               Assert.assertEquals(2, descriptor.getTaskManagerCount());
+       }
+
+       @Test
+       public void testCorrectSettingOfMaxSlots() throws Exception {
+
+               File confFile = tmp.newFile("flink-conf.yaml");
+               File jarFile = tmp.newFile("test.jar");
+               new CliFrontend(tmp.getRoot().getAbsolutePath());
+
+               String[] params =
+                       new String[] {"-yn", "2", "-ys", "3", 
jarFile.getAbsolutePath()};
+
+               RunOptions runOptions = 
CliFrontendParser.parseRunCommand(params);
+
+               FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
+
+               AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createDescriptor("", runOptions.getCommandLine());
+
+               // each task manager has 3 slots but the parallelism is 7. Thus 
the slots should be increased.
+               Assert.assertEquals(3, descriptor.getTaskManagerSlots());
+               Assert.assertEquals(2, descriptor.getTaskManagerCount());
+
+               Configuration config = new Configuration();
+               CliFrontend.setJobManagerAddressInConfig(config, new 
InetSocketAddress("test", 9000));
+               ClusterClient client = new TestingYarnClusterClient(descriptor, 
config);
+               Assert.assertEquals(6, client.getMaxSlots());
+       }
+
+       private static class TestCLI extends FlinkYarnSessionCli {
+
+               public TestCLI(String shortPrefix, String longPrefix) {
+                       super(shortPrefix, longPrefix);
+               }
+
+               private static class JarAgnosticClusterDescriptor extends 
YarnClusterDescriptor {
+                       @Override
+                       public void setLocalJarPath(Path localJarPath) {
+//                             setLocalJarPath("/tmp");
+                       }
+               }
+
+               @Override
+               protected AbstractYarnClusterDescriptor getClusterDescriptor() {
+                       return new JarAgnosticClusterDescriptor();
+               }
+       }
+
+       private static class TestingYarnClusterClient extends YarnClusterClient 
{
+
+               public TestingYarnClusterClient(AbstractYarnClusterDescriptor 
descriptor, Configuration config) throws IOException, YarnException {
+                       super(descriptor,
+                               Mockito.mock(YarnClient.class),
+                               Mockito.mock(ApplicationReport.class),
+                               config,
+                               new Path("/tmp"), true);
+               }
+
+               @Override
+               protected ActorSystem createActorSystem() throws IOException {
+                       return Mockito.mock(ActorSystem.class);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44b3bc45/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d59657d..cf70406 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -332,12 +333,13 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots();
                if (yarnTmSlots == -1) {
                        yarnTmSlots = 1;
+                       yarnClusterDescriptor.setTaskManagerSlots(yarnTmSlots);
                }
 
                int maxSlots = yarnTmSlots * 
yarnClusterDescriptor.getTaskManagerCount();
                int userParallelism = 
Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(),
 "-1"));
                if (userParallelism != -1) {
-                       int slotsPerTM = userParallelism / 
yarnClusterDescriptor.getTaskManagerCount();
+                       int slotsPerTM = (int) Math.ceil((double) 
userParallelism / yarnClusterDescriptor.getTaskManagerCount());
                        String message = "The YARN cluster has " + maxSlots + " 
slots available, " +
                                "but the user requested a parallelism of " + 
userParallelism + " on YARN. " +
                                "Each of the " + 
yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +

Reply via email to