This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new bdd396b YARN-8546. Resource leak caused by a reserved container being
released more than once under async scheduling. Contributed by Tao Yang.
bdd396b is described below
commit bdd396b26d421aea8747ad3254fc56e77f6fbefa
Author: Weiwei Yang <[email protected]>
AuthorDate: Wed Jul 25 17:35:27 2018 +0800
YARN-8546. Resource leak caused by a reserved container being released more
than once under async scheduling. Contributed by Tao Yang.
(cherry picked from commit 5be9f4a5d05c9cb99348719fe35626b1de3055db)
---
.../scheduler/common/fica/FiCaSchedulerApp.java | 15 ++++
.../TestCapacitySchedulerAsyncScheduling.java | 96 ++++++++++++++++++++++
2 files changed, 111 insertions(+)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 27a1684..d865e986 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -366,6 +366,21 @@ public class FiCaSchedulerApp extends
SchedulerApplicationAttempt {
.isEmpty()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
releaseContainer : allocation.getToRelease()) {
+ // Make sure to-release reserved containers are not outdated
+ if (releaseContainer.getRmContainer().getState()
+ == RMContainerState.RESERVED
+ && releaseContainer.getRmContainer() != releaseContainer
+ .getSchedulerNode().getReservedContainer()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to accept this proposal because "
+ + "it tries to release an outdated reserved container "
+ + releaseContainer.getRmContainer().getContainerId()
+ + " on node " + releaseContainer.getSchedulerNode().getNodeID()
+ + " whose reserved container is "
+ + releaseContainer.getSchedulerNode().getReservedContainer());
+ }
+ return false;
+ }
// Only consider non-reserved container (reserved container will
// not affect available resource of node) on the same node
if (releaseContainer.getRmContainer().getState()
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index c4e5493..6481f36 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -668,6 +669,101 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop();
}
+
+ @Test(timeout = 60000)
+ public void testReleaseOutdatedReservedContainer() throws Exception {
+ /*
+ * Submit a application, reserved container_02 on nm1,
+ * submit two allocate proposals which contain the same reserved
+ * container_02 as to-released container.
+ * First proposal should be accepted, second proposal should be rejected
+ * because it tries to release an outdated reserved container
+ */
+ MockRM rm1 = new MockRM();
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+ MockNM nm3 = rm1.registerNode("h3:1234", 8 * GB);
+ rm1.drainEvents();
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ LeafQueue defaultQueue = (LeafQueue) cs.getQueue("default");
+ SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+ SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
+ SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId());
+
+ // launch another app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(4 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ Resource allocateResource = Resources.createResource(5 * GB);
+ am1.allocate("*", (int) allocateResource.getMemorySize(), 3, 0,
+ new ArrayList<ContainerId>(), "");
+ FiCaSchedulerApp schedulerApp1 =
+ cs.getApplicationAttempt(am1.getApplicationAttemptId());
+
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+ Assert.assertEquals(9 * GB,
+ defaultQueue.getQueueResourceUsage().getUsed().getMemorySize());
+
+ RMContainer reservedContainer =
+ schedulerApp1.getReservedContainers().get(0);
+ ResourceCommitRequest allocateFromSameReservedContainerProposal1 =
+ createAllocateFromReservedProposal(3, allocateResource, schedulerApp1,
+ sn2, sn1, cs.getRMContext(), reservedContainer);
+ Assert.assertNotNull("Container should be reserved",
+ sn1.getReservedContainer());;
+ Assert.assertEquals("No memory should be used on " + sn2.getNodeName(),
+ 0, sn2.getAllocatedResource().getMemorySize());
+ cs.tryCommit(cs.getClusterResource(),
+ allocateFromSameReservedContainerProposal1);
+ Assert.assertNull("Container should have been unreserved",
+ sn1.getReservedContainer());;
+ Assert.assertEquals("Memory should be used on " + sn2.getNodeName(),
+ allocateResource.getMemorySize(),
+ sn2.getAllocatedResource().getMemorySize());
+ ResourceCommitRequest allocateFromSameReservedContainerProposal2 =
+ createAllocateFromReservedProposal(4, allocateResource, schedulerApp1,
+ sn3, sn1, cs.getRMContext(), reservedContainer);
+ cs.tryCommit(cs.getClusterResource(),
+ allocateFromSameReservedContainerProposal2);
+ Assert.assertFalse("This proposal should be rejected because "
+ + "it tries to release an outdated reserved container",
+ sn3.getAllocatedResource().getMemorySize() != 0);
+
+ rm1.close();
+ }
+
+ private ResourceCommitRequest createAllocateFromReservedProposal(
+ int containerId, Resource allocateResource, FiCaSchedulerApp
schedulerApp,
+ SchedulerNode allocateNode, SchedulerNode reservedNode,
+ RMContext rmContext, RMContainer reservedContainer) {
+ Container container = Container.newInstance(
+ ContainerId.newContainerId(schedulerApp.getApplicationAttemptId(),
containerId),
+ allocateNode.getNodeID(), allocateNode.getHttpAddress(),
allocateResource,
+ Priority.newInstance(0), null);
+ RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey
+ .create(ResourceRequest
+ .newInstance(Priority.newInstance(0), "*", allocateResource, 1)),
+ schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(),
"user",
+ rmContext);
+ SchedulerContainer allocateContainer =
+ new SchedulerContainer(schedulerApp, allocateNode, rmContainer, "",
true);
+ SchedulerContainer reservedSchedulerContainer =
+ new SchedulerContainer(schedulerApp, reservedNode, reservedContainer,
"",
+ false);
+ List<SchedulerContainer> toRelease = new ArrayList<>();
+ toRelease.add(reservedSchedulerContainer);
+ ContainerAllocationProposal allocateFromReservedProposal =
+ new ContainerAllocationProposal(allocateContainer, toRelease, null,
+ NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource);
+ List<ContainerAllocationProposal> allocateProposals = new ArrayList<>();
+ allocateProposals.add(allocateFromReservedProposal);
+ return new ResourceCommitRequest(allocateProposals, null, null);
+ }
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]