MAPREDUCE-6265. Make ContainerLauncherImpl.INITIAL_POOL_SIZE configurable to better control to launch/kill containers. Contributed by Zhihai Xu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d38520c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d38520c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d38520c Branch: refs/heads/HDFS-7836 Commit: 9d38520c8e42530a817a7f69c9aa73a9ad40639c Parents: 32741cf Author: Tsuyoshi Ozawa <[email protected]> Authored: Sat Mar 14 16:44:02 2015 +0900 Committer: Tsuyoshi Ozawa <[email protected]> Committed: Sat Mar 14 16:44:02 2015 +0900 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../v2/app/launcher/ContainerLauncherImpl.java | 14 +++++++++---- .../v2/app/launcher/TestContainerLauncher.java | 21 +++++++++++++++----- .../apache/hadoop/mapreduce/MRJobConfig.java | 8 ++++++++ .../src/main/resources/mapred-default.xml | 8 ++++++++ 5 files changed, 45 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d38520c/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0bbe85c..ab6eef5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -340,6 +340,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6263. Configurable timeout between YARNRunner terminate the application and forcefully kill. (Eric Payne via junping_du) + MAPREDUCE-6265. Make ContainerLauncherImpl.INITIAL_POOL_SIZE configurable + to better control to launch/kill containers. (Zhihai Xu via ozawa) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d38520c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 666f757..9c1125d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -70,7 +70,7 @@ public class ContainerLauncherImpl extends AbstractService implements new ConcurrentHashMap<ContainerId, Container>(); private final AppContext context; protected ThreadPoolExecutor launcherPool; - protected static final int INITIAL_POOL_SIZE = 10; + protected int initialPoolSize; private int limitOnPoolSize; private Thread eventHandlingThread; protected BlockingQueue<ContainerLauncherEvent> eventQueue = @@ -246,6 +246,12 @@ public class ContainerLauncherImpl extends AbstractService implements MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); + + this.initialPoolSize = conf.getInt( + MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE, + MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE); + LOG.info("The thread pool initial size is " + this.initialPoolSize); + super.serviceInit(conf); cmProxy = new ContainerManagementProtocolProxy(conf); } @@ -256,7 +262,7 @@ public class ContainerLauncherImpl extends AbstractService implements "ContainerLauncher #%d").setDaemon(true).build(); // Start with a default core-pool size of 10 and change it dynamically. - launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, + launcherPool = new ThreadPoolExecutor(initialPoolSize, Integer.MAX_VALUE, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); @@ -289,11 +295,11 @@ public class ContainerLauncherImpl extends AbstractService implements int idealPoolSize = Math.min(limitOnPoolSize, numNodes); if (poolSize < idealPoolSize) { - // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the + // Bump up the pool size to idealPoolSize+initialPoolSize, the // later is just a buffer so we are not always increasing the // pool-size int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize - + INITIAL_POOL_SIZE); + + initialPoolSize); LOG.info("Setting ContainerLauncher pool size to " + newPoolSize + " as number-of-nodes to talk to is " + numNodes); launcherPool.setCorePoolSize(newPoolSize); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d38520c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dc1d72f..41ee65d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -90,7 +90,7 @@ public class TestContainerLauncher { static final Log LOG = LogFactory.getLog(TestContainerLauncher.class); - @Test (timeout = 5000) + @Test (timeout = 10000) public void testPoolSize() throws InterruptedException { ApplicationId appId = ApplicationId.newInstance(12345, 67); @@ -108,12 +108,14 @@ public class TestContainerLauncher { ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); // No events yet + Assert.assertEquals(containerLauncher.initialPoolSize, + MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE); Assert.assertEquals(0, threadPool.getPoolSize()); - Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE, + Assert.assertEquals(containerLauncher.initialPoolSize, threadPool.getCorePoolSize()); Assert.assertNull(containerLauncher.foundErrors); - containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize; for (int i = 0; i < 10; i++) { ContainerId containerId = ContainerId.newContainerId(appAttemptId, i); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i); @@ -152,7 +154,7 @@ public class TestContainerLauncher { // Different hosts, there should be an increase in core-thread-pool size to // 21(11hosts+10buffer) // Core pool size should be 21 but the live pool size should be only 11. - containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = 11 + containerLauncher.initialPoolSize; containerLauncher.finishEventHandling = false; ContainerId containerId = ContainerId.newContainerId(appAttemptId, 21); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); @@ -164,6 +166,15 @@ public class TestContainerLauncher { Assert.assertNull(containerLauncher.foundErrors); containerLauncher.stop(); + + // change configuration MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE + // and verify initialPoolSize value. + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE, + 20); + containerLauncher = new CustomContainerLauncher(context); + containerLauncher.init(conf); + Assert.assertEquals(containerLauncher.initialPoolSize, 20); } @Test(timeout = 5000) @@ -187,7 +198,7 @@ public class TestContainerLauncher { ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); // 10 different hosts - containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize; for (int i = 0; i < 10; i++) { containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d38520c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 9f671cd..3aa304a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -504,6 +504,14 @@ public interface MRJobConfig { public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = 500; + /** + * The initial size of thread pool to launch containers in the app master + */ + public static final String MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE = + MR_AM_PREFIX+"containerlauncher.threadpool-initial-size"; + public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE = + 10; + /** Number of threads to handle job client RPC requests.*/ public static final String MR_AM_JOB_CLIENT_THREAD_COUNT = MR_AM_PREFIX + "job.client.thread-count"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d38520c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index d7bec9c..820c1ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1694,4 +1694,12 @@ calculated as (heapSize / mapreduce.heap.memory-mb.ratio). </description> </property> + +<property> + <name>yarn.app.mapreduce.am.containerlauncher.threadpool-initial-size</name> + <value>10</value> + <description>The initial size of thread pool to launch containers in the + app master. + </description> +</property> </configuration>
