Repository: hadoop Updated Branches: refs/heads/trunk eccb7d46e -> 6a6a59db7
YARN-3415. Non-AM containers can be counted towards amResourceUsage of a fairscheduler queue (Zhihai Xu via Sandy Ryza) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a6a59db Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a6a59db Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a6a59db Branch: refs/heads/trunk Commit: 6a6a59db7f1bfda47c3c14fb49676a7b22d2eb06 Parents: eccb7d4 Author: Sandy Ryza <[email protected]> Authored: Thu Apr 2 13:56:08 2015 -0700 Committer: Sandy Ryza <[email protected]> Committed: Thu Apr 2 13:56:08 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../scheduler/fair/FSAppAttempt.java | 31 +++++++----- .../scheduler/fair/FSLeafQueue.java | 5 +- .../scheduler/fair/FairScheduler.java | 6 --- .../scheduler/fair/TestFairScheduler.java | 50 ++++++++++++++++---- 5 files changed, 64 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6a59db/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 962e040..9a7de65 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -132,6 +132,9 @@ Release 2.8.0 - UNRELEASED YARN-3425. NPE from RMNodeLabelsManager.serviceStop when NodeLabelsManager.serviceInit failed. (Bibin A Chundatt via wangda) + YARN-3415. Non-AM containers can be counted towards amResourceUsage of a + Fair Scheduler queue (Zhihai Xu via Sandy Ryza) + Release 2.7.0 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6a59db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 46617ff..f0d1ed1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -523,8 +523,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Inform the node node.allocateContainer(allocatedContainer); - // If this container is used to run AM, update the leaf queue's AM usage - if (getLiveContainers().size() == 1 && !getUnmanagedAM()) { + // If not running unmanaged, the first container we allocate is always + // the AM. Set the amResource for this app and update the leaf queue's AM + // usage + if (!isAmRunning() && !getUnmanagedAM()) { + setAMResource(container.getResource()); getQueue().addAMResourceUsage(container.getResource()); setAmRunning(true); } @@ -551,6 +554,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); } + // Check the AM resource usage for the leaf queue + if (!isAmRunning() && !getUnmanagedAM()) { + List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests(); + if (ask.isEmpty() || !getQueue().canRunAppAM( + ask.get(0).getCapability())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping allocation because maxAMShare limit would " + + "be exceeded"); + } + return Resources.none(); + } + } + Collection<Priority> prioritiesToTry = (reserved) ? Arrays.asList(node.getReservedContainer().getReservedPriority()) : getPriorities(); @@ -567,17 +583,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt addSchedulingOpportunity(priority); - // Check the AM resource usage for the leaf queue - if (getLiveContainers().size() == 0 && !getUnmanagedAM()) { - if (!getQueue().canRunAppAM(getAMResource())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping allocation because maxAMShare limit would " + - "be exceeded"); - } - return Resources.none(); - } - } - ResourceRequest rackLocalRequest = getResourceRequest(priority, node.getRackName()); ResourceRequest localRequest = getResourceRequest(priority, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6a59db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index c49a323..04dbd2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -124,8 +124,9 @@ public class FSLeafQueue extends FSQueue { writeLock.unlock(); } - // Update AM resource usage if needed - if (runnable && app.isAmRunning() && app.getAMResource() != null) { + // Update AM resource usage if needed. If isAMRunning is true, we're not + // running an unmanaged AM. + if (runnable && app.isAmRunning()) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6a59db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 04c7f70..a6c5416 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -901,12 +901,6 @@ public class FairScheduler extends // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); - // Set amResource for this app - if (!application.getUnmanagedAM() && ask.size() == 1 - && application.getLiveContainers().isEmpty()) { - application.setAMResource(ask.get(0).getCapability()); - } - // Release containers releaseContainers(release, application); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6a59db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ff215cd..b5bfb8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -3548,8 +3549,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application3's AM requests 1024 MB memory", - 1024, app3.getAMResource().getMemory()); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); assertEquals("Application3's AM should not be running", 0, app3.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3574,6 +3575,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { 0, app1.getLiveContainers().size()); assertEquals("Application3's AM should be running", 1, app3.getLiveContainers().size()); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3584,8 +3587,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application4's AM requests 2048 MB memory", - 2048, app4.getAMResource().getMemory()); + assertEquals("Application4's AM resource shouldn't be updated", + 0, app4.getAMResource().getMemory()); assertEquals("Application4's AM should not be running", 0, app4.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3598,8 +3601,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application5's AM requests 2048 MB memory", - 2048, app5.getAMResource().getMemory()); + assertEquals("Application5's AM resource shouldn't be updated", + 0, app5.getAMResource().getMemory()); assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3631,6 +3634,33 @@ public class TestFairScheduler extends FairSchedulerTestBase { 0, app3.getLiveContainers().size()); assertEquals("Application5's AM should be running", 1, app5.getLiveContainers().size()); + assertEquals("Application5's AM requests 2048 MB memory", + 2048, app5.getAMResource().getMemory()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // request non-AM container for app5 + createSchedulingRequestExistingApplication(1024, 1, attId5); + assertEquals("Application5's AM should have 1 container", + 1, app5.getLiveContainers().size()); + // complete AM container before non-AM container is allocated. + // spark application hit this situation. + RMContainer amContainer5 = (RMContainer)app5.getLiveContainers().toArray()[0]; + ContainerExpiredSchedulerEvent containerExpired = + new ContainerExpiredSchedulerEvent(amContainer5.getContainerId()); + scheduler.handle(containerExpired); + assertEquals("Application5's AM should have 0 container", + 0, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + scheduler.update(); + scheduler.handle(updateEvent); + // non-AM container should be allocated + // check non-AM container allocation is not rejected + // due to queue MaxAMShare limitation. + assertEquals("Application5 should have 1 container", + 1, app5.getLiveContainers().size()); + // check non-AM container allocation won't affect queue AmResourceUsage assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3643,8 +3673,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", 0, app6.getLiveContainers().size()); - assertEquals("Application6's AM requests 2048 MB memory", - 2048, app6.getAMResource().getMemory()); + assertEquals("Application6's AM resource shouldn't be updated", + 0, app6.getAMResource().getMemory()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3748,8 +3778,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application2's AM requests 1024 MB memory", - 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM resource shouldn't be updated", + 0, app2.getAMResource().getMemory()); assertEquals("Application2's AM should not be running", 0, app2.getLiveContainers().size()); assertEquals("Queue2's AM resource usage should be 0 MB memory",
