Repository: hadoop Updated Branches: refs/heads/trunk fce795101 -> 34f113df5
YARN-6714. IllegalStateException while handling APP_ATTEMPT_REMOVED event when async-scheduling enabled in CapacityScheduler. Contributed by Tao Yang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34f113df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34f113df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34f113df Branch: refs/heads/trunk Commit: 34f113df5cff2cc330fb671296932b8227b11975 Parents: fce7951 Author: Sunil G <sun...@apache.org> Authored: Tue Jul 11 14:52:44 2017 +0530 Committer: Sunil G <sun...@apache.org> Committed: Tue Jul 11 14:52:44 2017 +0530 ---------------------------------------------------------------------- .../scheduler/capacity/CapacityScheduler.java | 5 +- .../TestCapacitySchedulerAsyncScheduling.java | 149 +++++++++++++++++++ 2 files changed, 153 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/34f113df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d3186da..0d72860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2392,7 +2392,10 @@ public class CapacityScheduler extends if (attemptId != null) { FiCaSchedulerApp app = getApplicationAttempt(attemptId); - if (app != null) { + // Required sanity check for attemptId - when async-scheduling enabled, + // proposal might be outdated if AM failover just finished + // and proposal queue was not be consumed in time + if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app.accept(cluster, request)) { app.apply(cluster, request); LOG.info("Allocation proposal accepted"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/34f113df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 9854a15..0eb89d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -18,6 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -25,12 +31,29 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class TestCapacitySchedulerAsyncScheduling { @@ -140,4 +163,130 @@ public class TestCapacitySchedulerAsyncScheduling { rm.close(); } + + // Testcase for YARN-6714 + @Test (timeout = 30000) + public void testCommitProposalForFailedAppAttempt() + throws Exception { + // disable async-scheduling for simulating complex since scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs & Nodes + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + List<MockNM> nmLst = new ArrayList<>(); + nmLst.add(nm1); + nmLst.add(nm2); + + // init scheduler & nodes + while ( + ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker() + .nodeCount() < 2) { + Thread.sleep(10); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + CapacityScheduler scheduler = + (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId()); + SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId()); + + // launch app + RMApp app = rm.submitApp(200, "app", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + FiCaSchedulerApp schedulerApp = + scheduler.getApplicationAttempt(am.getApplicationAttemptId()); + + // allocate and launch 1 containers and running on nm2 + allocateAndLaunchContainers(am, nm2, rm, 1, + Resources.createResource(5 * GB), 0, 2); + + // nm1 runs 1 container(app1-container_01/AM) + // nm2 runs 1 container(app1-container_02) + Assert.assertEquals(1, sn1.getNumContainers()); + Assert.assertEquals(1, sn2.getNumContainers()); + + // kill app attempt1 + scheduler.handle( + new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(), + RMAppAttemptState.KILLED, true)); + // wait until app attempt1 removed on nm1 + while (sn1.getCopiedListOfRunningContainers().size() == 1) { + Thread.sleep(100); + } + // wait until app attempt2 launched on nm1 + while (sn1.getCopiedListOfRunningContainers().size() == 0) { + nm1.nodeHeartbeat(true); + Thread.sleep(100); + } + + // generate reserved proposal of stopped app attempt + // and it could be committed for async-scheduling + // this kind of proposal should be skipped + Resource reservedResource = Resources.createResource(5 * GB); + Container container = Container.newInstance( + ContainerId.newContainerId(am.getApplicationAttemptId(), 3), + sn2.getNodeID(), sn2.getHttpAddress(), reservedResource, + Priority.newInstance(0), null); + RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey + .create(ResourceRequest + .newInstance(Priority.newInstance(0), "*", reservedResource, 1)), + am.getApplicationAttemptId(), sn2.getNodeID(), "user", + rm.getRMContext()); + SchedulerContainer reservedContainer = + new SchedulerContainer(schedulerApp, scheduler.getNode(sn2.getNodeID()), + rmContainer, "", false); + ContainerAllocationProposal reservedForAttempt1Proposal = + new ContainerAllocationProposal(reservedContainer, null, + reservedContainer, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, reservedResource); + List<ContainerAllocationProposal> reservedProposals = new ArrayList<>(); + reservedProposals.add(reservedForAttempt1Proposal); + ResourceCommitRequest request = + new ResourceCommitRequest(null, reservedProposals, null); + scheduler.tryCommit(scheduler.getClusterResource(), request); + Assert.assertNull("Outdated proposal should not be accepted!", + sn2.getReservedContainer()); + + rm.stop(); + } + + + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, + int nContainer, Resource resource, int priority, int startContainerId) + throws Exception { + am.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(priority), "*", resource, + nContainer)), null); + ContainerId lastContainerId = ContainerId + .newContainerId(am.getApplicationAttemptId(), + startContainerId + nContainer - 1); + Assert.assertTrue( + rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED)); + // Acquire them, and NM report RUNNING + am.allocate(null, null); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + for (int cId = startContainerId; + cId < startContainerId + nContainer; cId++) { + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), cId); + RMContainer rmContainer = cs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + Assert.fail("Cannot find RMContainer"); + } + rm.waitForState(nm, + ContainerId.newContainerId(am.getApplicationAttemptId(), cId), + RMContainerState.RUNNING); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org