Repository: hadoop Updated Branches: refs/heads/branch-2 1725d5613 -> 4b50e2327
YARN-2630. Prevented previous AM container status from being acquired by the current restarted AM. Contributed by Jian He. (cherry picked from commit 52bbe0f11bc8e97df78a1ab9b63f4eff65fd7a76) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b50e232 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b50e232 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b50e232 Branch: refs/heads/branch-2 Commit: 4b50e23271f3de5b18f0305cb82acbe0a5a97328 Parents: 1725d56 Author: Zhijie Shen <zjs...@apache.org> Authored: Wed Oct 1 15:38:11 2014 -0700 Committer: Zhijie Shen <zjs...@apache.org> Committed: Wed Oct 1 15:39:36 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 1 + .../protocolrecords/NodeHeartbeatResponse.java | 9 +-- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 45 ++++++------- .../yarn_server_common_service_protos.proto | 2 +- .../nodemanager/NodeStatusUpdaterImpl.java | 2 +- .../nodemanager/TestNodeStatusUpdater.java | 2 +- .../rmapp/attempt/RMAppAttemptImpl.java | 69 +++++++++++++------- .../resourcemanager/rmnode/RMNodeImpl.java | 25 +++---- .../applicationsmanager/TestAMRestart.java | 24 ++----- .../attempt/TestRMAppAttemptTransitions.java | 12 ++-- 11 files changed, 108 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e8db12d..bbd6013 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -470,6 +470,9 @@ Release 2.6.0 - UNRELEASED YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore. (Zhijie Shen via jianhe) + YARN-2630. Prevented previous AM container status from being acquired by the + current restarted AM. (Jian He via zjshen) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 2451030..df9f34b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -668,6 +668,7 @@ public class ApplicationMaster { + ", completed=" + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed=" + numFailedContainers.get(); + LOG.info(appMessage); success = false; } try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 9887acc..12e1f54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -30,7 +30,7 @@ public interface NodeHeartbeatResponse { NodeAction getNodeAction(); List<ContainerId> getContainersToCleanup(); - List<ContainerId> getFinishedContainersPulledByAM(); + List<ContainerId> getContainersToBeRemovedFromNM(); List<ApplicationId> getApplicationsToCleanup(); @@ -45,9 +45,10 @@ public interface NodeHeartbeatResponse { void addAllContainersToCleanup(List<ContainerId> containers); - // This tells NM to remove finished containers only after the AM - // has actually received it in a previous allocate response - void addFinishedContainersPulledByAM(List<ContainerId> containers); + // This tells NM to remove finished containers from its context. Currently, NM + // will remove finished containers from its context only after AM has actually + // received the finished containers in a previous allocate response + void addContainersToBeRemovedFromNM(List<ContainerId> containers); void addAllApplicationsToCleanup(List<ApplicationId> applications); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index e9296f4..78979d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -40,13 +40,14 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; -public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse { +public class NodeHeartbeatResponsePBImpl extends + ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse { NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance(); NodeHeartbeatResponseProto.Builder builder = null; boolean viaProto = false; private List<ContainerId> containersToCleanup = null; - private List<ContainerId> finishedContainersPulledByAM = null; + private List<ContainerId> containersToBeRemovedFromNM = null; private List<ApplicationId> applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -74,8 +75,8 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse if (this.applicationsToCleanup != null) { addApplicationsToCleanupToProto(); } - if (this.finishedContainersPulledByAM != null) { - addFinishedContainersPulledByAMToProto(); + if (this.containersToBeRemovedFromNM != null) { + addContainersToBeRemovedFromNMToProto(); } if (this.containerTokenMasterKey != null) { builder.setContainerTokenMasterKey( @@ -204,9 +205,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse } @Override - public List<ContainerId> getFinishedContainersPulledByAM() { - initFinishedContainersPulledByAM(); - return this.finishedContainersPulledByAM; + public List<ContainerId> getContainersToBeRemovedFromNM() { + initContainersToBeRemovedFromNM(); + return this.containersToBeRemovedFromNM; } private void initContainersToCleanup() { @@ -222,16 +223,16 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse } } - private void initFinishedContainersPulledByAM() { - if (this.finishedContainersPulledByAM != null) { + private void initContainersToBeRemovedFromNM() { + if (this.containersToBeRemovedFromNM != null) { return; } NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList(); - this.finishedContainersPulledByAM = new ArrayList<ContainerId>(); + List<ContainerIdProto> list = p.getContainersToBeRemovedFromNmList(); + this.containersToBeRemovedFromNM = new ArrayList<ContainerId>(); for (ContainerIdProto c : list) { - this.finishedContainersPulledByAM.add(convertFromProtoFormat(c)); + this.containersToBeRemovedFromNM.add(convertFromProtoFormat(c)); } } @@ -245,12 +246,12 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse } @Override - public void addFinishedContainersPulledByAM( - final List<ContainerId> finishedContainersPulledByAM) { - if (finishedContainersPulledByAM == null) + public void + addContainersToBeRemovedFromNM(final List<ContainerId> containers) { + if (containers == null) return; - initFinishedContainersPulledByAM(); - this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM); + initContainersToBeRemovedFromNM(); + this.containersToBeRemovedFromNM.addAll(containers); } private void addContainersToCleanupToProto() { @@ -288,10 +289,10 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse builder.addAllContainersToCleanup(iterable); } - private void addFinishedContainersPulledByAMToProto() { + private void addContainersToBeRemovedFromNMToProto() { maybeInitBuilder(); - builder.clearFinishedContainersPulledByAm(); - if (finishedContainersPulledByAM == null) + builder.clearContainersToBeRemovedFromNm(); + if (containersToBeRemovedFromNM == null) return; Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() { @@ -299,7 +300,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse public Iterator<ContainerIdProto> iterator() { return new Iterator<ContainerIdProto>() { - Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator(); + Iterator<ContainerId> iter = containersToBeRemovedFromNM.iterator(); @Override public boolean hasNext() { @@ -320,7 +321,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse } }; - builder.addAllFinishedContainersPulledByAm(iterable); + builder.addAllContainersToBeRemovedFromNm(iterable); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 600f54d..d0990fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,7 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; - repeated ContainerIdProto finished_containers_pulled_by_am = 9; + repeated ContainerIdProto containers_to_be_removed_from_nm = 9; } message NMContainerStatusProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b4dcf1f..eecba39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -558,7 +558,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // when NM re-registers with RM. // Only remove the cleanedup containers that are acked removeCompletedContainersFromContext(response - .getFinishedContainersPulledByAM()); + .getContainersToBeRemovedFromNM()); lastHeartBeatID = response.getResponseId(); List<ContainerId> containersToCleanup = response http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 8fb51a3..7837846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -692,7 +692,7 @@ public class TestNodeStatusUpdater { NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); - nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM); + nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM); return nhResponse; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index d75a871..fbcb7d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -687,20 +687,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now - for (NodeId nodeId:finishedContainersSentToAM.keySet()) { - - // Clear and get current values - List<ContainerStatus> currentSentContainers = - finishedContainersSentToAM - .put(nodeId, new ArrayList<ContainerStatus>()); - List<ContainerId> containerIdList = new ArrayList<ContainerId> - (currentSentContainers.size()); - for (ContainerStatus containerStatus:currentSentContainers) { - containerIdList.add(containerStatus.getContainerId()); - } - eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent( - nodeId, containerIdList)); - } + sendFinishedContainersToNM(); // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt @@ -1592,14 +1579,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); - // Add all finished containers so that they can be acked to NM - addJustFinishedContainer(appAttempt, containerFinishedEvent); - // Is this container the AmContainer? If the finished container is same as // the AMContainer, AppAttempt fails if (appAttempt.masterContainer != null && appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { + appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent); // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, @@ -1607,10 +1592,46 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { return RMAppAttemptState.FINAL_SAVING; } + // Add all finished containers so that they can be acked to NM + addJustFinishedContainer(appAttempt, containerFinishedEvent); return this.currentState; } } + + // Ack NM to remove finished containers from context. + private void sendFinishedContainersToNM() { + for (NodeId nodeId : finishedContainersSentToAM.keySet()) { + + // Clear and get current values + List<ContainerStatus> currentSentContainers = + finishedContainersSentToAM.put(nodeId, + new ArrayList<ContainerStatus>()); + List<ContainerId> containerIdList = + new ArrayList<ContainerId>(currentSentContainers.size()); + for (ContainerStatus containerStatus : currentSentContainers) { + containerIdList.add(containerStatus.getContainerId()); + } + eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, + containerIdList)); + } + } + + // Add am container to the list so that am container instance will be + // removed from NMContext. + private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, + RMAppAttemptContainerFinishedEvent containerFinishedEvent) { + NodeId nodeId = containerFinishedEvent.getNodeId(); + finishedContainersSentToAM.putIfAbsent(nodeId, + new ArrayList<ContainerStatus>()); + appAttempt.finishedContainersSentToAM.get(nodeId).add( + containerFinishedEvent.getContainerStatus()); + if (!appAttempt.getSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + appAttempt.sendFinishedContainersToNM(); + } + } + private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent @@ -1661,16 +1682,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); - // Add all finished containers so that they can be acked to NM. - addJustFinishedContainer(appAttempt, containerFinishedEvent); - // Is this container the ApplicationMaster container? if (appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { new FinalTransition(RMAppAttemptState.FINISHED).transition( appAttempt, containerFinishedEvent); + appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent); return RMAppAttemptState.FINISHED; } + // Add all finished containers so that they can be acked to NM. + addJustFinishedContainer(appAttempt, containerFinishedEvent); return RMAppAttemptState.FINISHING; } @@ -1686,14 +1707,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); - // Add all finished containers so that they can be acked to NM. - addJustFinishedContainer(appAttempt, containerFinishedEvent); - // If this is the AM container, it means the AM container is finished, // but we are not yet acknowledged that the final state has been saved. // Thus, we still return FINAL_SAVING state here. if (appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { + appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent); + if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED) || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) { // ignore Container_Finished Event if we were supposed to reach @@ -1708,6 +1728,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED); return; } + + // Add all finished containers so that they can be acked to NM. + addJustFinishedContainer(appAttempt, containerFinishedEvent); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1123a98..c960b50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -112,8 +112,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( new ContainerIdComparator()); - /* set of containers that were notified to AM about their completion */ - private final Set<ContainerId> finishedContainersPulledByAM = + /* + * set of containers to notify NM to remove them from its context. Currently, + * this includes containers that were notified to AM about their completion + */ + private final Set<ContainerId> containersToBeRemovedFromNM = new HashSet<ContainerId>(); /* the list of applications that have finished and need to be purged */ @@ -157,7 +160,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new FinishedContainersPulledByAMTransition()) + new AddContainersToBeRemovedFromNMTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, @@ -174,7 +177,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { new UpdateNodeResourceWhenUnusableTransition()) .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new FinishedContainersPulledByAMTransition()) + new AddContainersToBeRemovedFromNMTransition()) //Transitions from LOST state .addTransition(NodeState.LOST, NodeState.LOST, @@ -182,7 +185,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { new UpdateNodeResourceWhenUnusableTransition()) .addTransition(NodeState.LOST, NodeState.LOST, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new FinishedContainersPulledByAMTransition()) + new AddContainersToBeRemovedFromNMTransition()) //Transitions from UNHEALTHY state .addTransition(NodeState.UNHEALTHY, @@ -208,7 +211,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new FinishedContainersPulledByAMTransition()) + new AddContainersToBeRemovedFromNMTransition()) // create the topology tables .installTopology(); @@ -382,11 +385,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { response.addAllContainersToCleanup( new ArrayList<ContainerId>(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); - response.addFinishedContainersPulledByAM( - new ArrayList<ContainerId>(this.finishedContainersPulledByAM)); + response.addContainersToBeRemovedFromNM( + new ArrayList<ContainerId>(this.containersToBeRemovedFromNM)); this.containersToClean.clear(); this.finishedApplications.clear(); - this.finishedContainersPulledByAM.clear(); + this.containersToBeRemovedFromNM.clear(); } finally { this.writeLock.unlock(); } @@ -659,12 +662,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } } - public static class FinishedContainersPulledByAMTransition implements + public static class AddContainersToBeRemovedFromNMTransition implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - rmNode.finishedContainersPulledByAM.addAll((( + rmNode.containersToBeRemovedFromNM.addAll((( RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index ba592fc..fcb4e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -98,9 +98,6 @@ public class TestAMRestart { Thread.sleep(200); } - ContainerId amContainerId = ContainerId.newInstance(am1 - .getApplicationAttemptId(), 1); - // launch the 2nd container, for testing running container transferred. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); ContainerId containerId2 = @@ -199,15 +196,11 @@ public class TestAMRestart { // completed containerId4 is also transferred to the new attempt. RMAppAttempt newAttempt = app1.getRMAppAttempt(am2.getApplicationAttemptId()); - // 4 containers finished, acquired/allocated/reserved/completed + AM - // container. - waitForContainersToFinish(5, newAttempt); + // 4 containers finished, acquired/allocated/reserved/completed. + waitForContainersToFinish(4, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = - false, container6Exists = false, amContainerExists = false; + false, container6Exists = false; for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { - if(status.getContainerId().equals(amContainerId)) { - amContainerExists = true; - } if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. @@ -227,11 +220,8 @@ public class TestAMRestart { container6Exists = true; } } - Assert.assertTrue(amContainerExists); - Assert.assertTrue(container3Exists); - Assert.assertTrue(container4Exists); - Assert.assertTrue(container5Exists); - Assert.assertTrue(container6Exists); + Assert.assertTrue(container3Exists && container4Exists && container5Exists + && container6Exists); // New SchedulerApplicationAttempt also has the containers info. rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); @@ -250,14 +240,14 @@ public class TestAMRestart { // all 4 normal containers finished. System.out.println("New attempt's just finished containers: " + newAttempt.getJustFinishedContainers()); - waitForContainersToFinish(6, newAttempt); + waitForContainersToFinish(5, newAttempt); rm1.stop(); } private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; - while (attempt.getJustFinishedContainers().size() < expectedNum + while (attempt.getJustFinishedContainers().size() != expectedNum && count < 500) { Thread.sleep(100); count++; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b50e232/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 15028f9..7f27f4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -965,7 +965,7 @@ public class TestRMAppAttemptTransitions { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); - assertEquals(2, applicationAttempt.getJustFinishedContainers().size()); + assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", @@ -1003,7 +1003,7 @@ public class TestRMAppAttemptTransitions { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); - assertEquals(1,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", @@ -1192,7 +1192,7 @@ public class TestRMAppAttemptTransitions { BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 1, false); + diagnostics, 0, false); } // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before @@ -1225,7 +1225,7 @@ public class TestRMAppAttemptTransitions { // send attempt_saved sendAttemptUpdateSavedEvent(applicationAttempt); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 1, false); + diagnostics, 0, false); } // While attempt is at FINAL_SAVING, Expire event may come before @@ -1381,13 +1381,13 @@ public class TestRMAppAttemptTransitions { verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); // failed attempt captured the container finished event. - assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); + assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); ContainerStatus cs2 = ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( appAttemptId, cs2, anyNodeId)); - assertEquals(2, applicationAttempt.getJustFinishedContainers().size()); + assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); boolean found = false; for (ContainerStatus containerStatus:applicationAttempt .getJustFinishedContainers()) {