This is an automated email from the ASF dual-hosted git repository.
wwei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c4e39e3 YARN-9164. Shutdown NM may cause NPE when opportunistic
container scheduling is enabled. Contributed by lujie.
c4e39e3 is described below
commit c4e39e3a59b7e47ad6c269131c122ec14f8746ec
Author: Weiwei Yang <[email protected]>
AuthorDate: Thu Jan 3 23:56:28 2019 +0800
YARN-9164. Shutdown NM may cause NPE when opportunistic container
scheduling is enabled. Contributed by lujie.
(cherry picked from commit cfe89e6f963ba25b5fff1ce48cad36d74b3c789c)
---
.../OpportunisticContainerAllocatorAMService.java | 8 ++-
.../scheduler/AbstractYarnScheduler.java | 12 ++--
.../resourcemanager/scheduler/SchedulerUtils.java | 8 ++-
...stOpportunisticContainerAllocatorAMService.java | 78 ++++++++++++++++++++++
4 files changed, 97 insertions(+), 9 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index ce425df..3b814d7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -336,9 +336,11 @@ public class OpportunisticContainerAllocatorAMService
RMContainer rmContainer =
SchedulerUtils.createOpportunisticRmContainer(
rmContext, container, isRemotelyAllocated);
- rmContainer.handle(
- new RMContainerEvent(container.getId(),
- RMContainerEventType.ACQUIRED));
+ if (rmContainer!=null) {
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(),
+ RMContainerEventType.ACQUIRED));
+ }
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 50e7b10..e3e157e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -672,8 +672,10 @@ public abstract class AbstractYarnScheduler
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
}
- getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
- rmContainer.getContainerId(), false);
+ SchedulerNode node = getSchedulerNode(rmContainer.getNodeId());
+ if (node != null) {
+ node.releaseContainer(rmContainer.getContainerId(), false);
+ }
}
// If the container is getting killed in ACQUIRED state, the requester (AM
@@ -1229,8 +1231,10 @@ public abstract class AbstractYarnScheduler
uReq.getContainerUpdateType()) {
RMContainer demotedRMContainer =
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
- appAttempt.addToNewlyDemotedContainers(
- uReq.getContainerId(), demotedRMContainer);
+ if (demotedRMContainer != null) {
+ appAttempt.addToNewlyDemotedContainers(
+ uReq.getContainerId(), demotedRMContainer);
+ }
} else {
RMContainer demotedRMContainer = createDecreasedRMContainer(
appAttempt, uReq, rmContainer);
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 041a762..3275ed2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -394,6 +394,11 @@ public class SchedulerUtils {
public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
Container container, boolean isRemotelyAllocated) {
+ SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler())
+ .getNode(container.getNodeId());
+ if (node == null) {
+ return null;
+ }
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
@@ -402,8 +407,7 @@ public class SchedulerUtils {
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
- ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
- container.getNodeId()).allocateContainer(rmContainer);
+ node.allocateContainer(rmContainer);
return rmContainer;
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 1af930f..023f16e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
@@ -72,14 +73,19 @@ import
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistrib
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -88,6 +94,7 @@ import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
.FifoScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
@@ -95,12 +102,17 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.base.Supplier;
+
+import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeoutException;
/**
* Test cases for {@link OpportunisticContainerAllocatorAMService}.
@@ -797,6 +809,72 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(1, ctxt.getNodeMap().size());
}
+ @Test(timeout = 60000)
+ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
+ MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
+ nm.registerNode();
+ OpportunisticContainerAllocatorAMService amservice =
+ (OpportunisticContainerAllocatorAMService) rm
+ .getApplicationMasterService();
+ RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ SchedulerApplicationAttempt schedulerAttempt =
+ ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+ nm.nodeHeartbeat(true);
+ ((RMNodeImpl) rmNode1)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ // Send add and update node events to AM Service.
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ return scheduler.getNumClusterNodes() == 1;
+ }
+ }, 10, 200 * 100);
+ }catch (TimeoutException e) {
+ fail("timed out while waiting for NM to add.");
+ }
+ AllocateResponse allocateResponse = am.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))),
+ null);
+ List<Container> allocatedContainers = allocateResponse
+ .getAllocatedContainers();
+ Container container = allocatedContainers.get(0);
+ scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ return scheduler.getNumClusterNodes() == 0;
+ }
+ }, 10, 200 * 100);
+ }catch (TimeoutException e) {
+ fail("timed out while waiting for NM to remove.");
+ }
+ //test YARN-9165
+ RMContainer rmContainer = null;
+ rmContainer = SchedulerUtils.createOpportunisticRmContainer(
+ rm.getRMContext(), container, true);
+ if (rmContainer == null) {
+ rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container),
+ schedulerAttempt.getApplicationAttemptId(), container.getNodeId(),
+ schedulerAttempt.getUser(), rm.getRMContext(), true);
+ }
+ assert(rmContainer!=null);
+ //test YARN-9164
+ schedulerAttempt.addRMContainer(container.getId(), rmContainer);
+ scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId,
+ RMAppAttemptState.FAILED, false));
+ }
+
private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
int queueLength) {
OpportunisticContainersStatus status1 =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]