Repository: hadoop Updated Branches: refs/heads/branch-2.8 c328ab47f -> c98963b55
YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c98963b5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c98963b5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c98963b5 Branch: refs/heads/branch-2.8 Commit: c98963b558ffcb9f698b501742197a40f6326f17 Parents: c328ab4 Author: Jason Lowe <[email protected]> Authored: Thu Jan 25 18:04:24 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Thu Jan 25 18:04:24 2018 -0600 ---------------------------------------------------------------------- .../hadoop/yarn/sls/appmaster/AMSimulator.java | 4 +- .../yarn/sls/nodemanager/NMSimulator.java | 4 +- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 11 +--- .../yarn/sls/scheduler/RMNodeWrapper.java | 12 +--- .../resourcemanager/ResourceTrackerService.java | 67 ++++++++++++++------ .../server/resourcemanager/rmnode/RMNode.java | 13 ++-- .../resourcemanager/rmnode/RMNodeImpl.java | 48 +++----------- .../rmnode/RMNodeStatusEvent.java | 13 +--- .../yarn/server/resourcemanager/MockNM.java | 16 +++-- .../yarn/server/resourcemanager/MockNodes.java | 9 +-- .../resourcemanager/TestRMNodeTransitions.java | 11 +--- .../TestResourceTrackerService.java | 29 ++++++++- .../TestRMAppLogAggregationStatus.java | 10 +-- 13 files changed, 120 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 2272e3e..ff68e96 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -90,7 +90,7 @@ public abstract class AMSimulator extends TaskRunner.Task { RecordFactoryProvider.getRecordFactory(null); // response queue protected final BlockingQueue<AllocateResponse> responseQueue; - protected int RESPONSE_ID = 1; + private int responseId = 0; // user name protected String user; // queue name @@ -213,7 +213,7 @@ public abstract class AMSimulator extends TaskRunner.Task { List<ContainerId> toRelease) { AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setResponseId(RESPONSE_ID ++); + allocateRequest.setResponseId(responseId++); allocateRequest.setAskList(ask); allocateRequest.setReleaseList(toRelease); return allocateRequest; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 0947ba8..69c9a13 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -73,7 +73,7 @@ public class NMSimulator extends TaskRunner.Task { // resource manager private ResourceManager rm; // heart beat response id - private int RESPONSE_ID = 1; + private int responseId = 0; private final static Logger LOG = Logger.getLogger(NMSimulator.class); public void init(String nodeIdStr, int memory, int cores, @@ -134,7 +134,7 @@ public class NMSimulator extends TaskRunner.Task { ns.setContainersStatuses(generateContainerStatusList()); ns.setNodeId(node.getNodeID()); ns.setKeepAliveApplications(new ArrayList<ApplicationId>()); - ns.setResponseId(RESPONSE_ID ++); + ns.setResponseId(responseId++); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); beatRequest.setNodeStatus(ns); NodeHeartbeatResponse beatResponse = http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 951f5a8..8c8b849 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -143,8 +143,8 @@ public class NodeInfo { return runningApplications; } - public void updateNodeHeartbeatResponseForCleanup( - NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { } public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { @@ -178,13 +178,6 @@ public class NodeInfo { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - // TODO Auto-generated method stub - - } - - @Override public List<Container> pullNewlyIncreasedContainers() { // TODO Auto-generated method stub return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index e5013c4..4c2a615 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -126,9 +126,9 @@ public class RMNodeWrapper implements RMNode { } @Override - public void updateNodeHeartbeatResponseForCleanup( - NodeHeartbeatResponse nodeHeartbeatResponse) { - node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse nodeHeartbeatResponse) { + node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse); } @Override @@ -166,12 +166,6 @@ public class RMNodeWrapper implements RMNode { return RMNodeLabelsManager.EMPTY_STRING_SET; } - @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - // TODO Auto-generated method stub - } - @SuppressWarnings("unchecked") @Override public List<Container> pullNewlyIncreasedContainers() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3a3dd63..89ebee4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -386,14 +388,37 @@ public class ResourceTrackerService extends AbstractService implements } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); - // Reset heartbeat ID since node just restarted. - oldNode.resetLastNodeHeartBeatResponse(); - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeReconnectEvent(nodeId, rmNode, request - .getRunningApplications(), request.getNMContainerStatuses())); + + if (CollectionUtils.isEmpty(request.getRunningApplications()) + && rmNode.getState() != NodeState.DECOMMISSIONING + && rmNode.getHttpPort() != oldNode.getHttpPort()) { + // Reconnected node differs, so replace old node and start new node + switch (rmNode.getState()) { + case RUNNING: + ClusterMetrics.getMetrics().decrNumActiveNodes(); + break; + case UNHEALTHY: + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + break; + default: + LOG.debug("Unexpected Rmnode state"); + } + this.rmContext.getDispatcher().getEventHandler() + .handle(new NodeRemovedSchedulerEvent(rmNode)); + + this.rmContext.getRMNodes().put(nodeId, rmNode); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeStartedEvent(nodeId, null, null)); + + } else { + // Reset heartbeat ID since node just restarted. + oldNode.resetLastNodeHeartBeatResponse(); + + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeReconnectEvent(nodeId, rmNode, + request.getRunningApplications(), + request.getNMContainerStatuses())); + } } // On every node manager register we will be clearing NMToken keys if // present for any running application. @@ -490,12 +515,13 @@ public class ResourceTrackerService extends AbstractService implements // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); - if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse - .getResponseId()) { + if (getNextResponseId( + remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse + .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; - } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse + } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" @@ -510,13 +536,11 @@ public class ResourceTrackerService extends AbstractService implements } // Heartbeat response - NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils - .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. - getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); - rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); - rmNode.updateNodeHeartbeatResponseForContainersDecreasing( - nodeHeartBeatResponse); + NodeHeartbeatResponse nodeHeartBeatResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse( + getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), + NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); + rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); @@ -528,7 +552,7 @@ public class ResourceTrackerService extends AbstractService implements // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = - new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); + new RMNodeStatusEvent(nodeId, remoteNodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request @@ -562,6 +586,11 @@ public class ResourceTrackerService extends AbstractService implements return nodeHeartBeatResponse; } + private int getNextResponseId(int responseId) { + // Loop between 0 and Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + /** * Check if node in decommissioning state. * @param nodeId http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 48df1e8..2e164a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -134,10 +134,11 @@ public interface RMNode { /** * Update a {@link NodeHeartbeatResponse} with the list of containers and - * applications to clean up for this node. + * applications to clean up for this node, and the containers to be updated. + * * @param response the {@link NodeHeartbeatResponse} to update */ - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); + void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response); public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); @@ -160,13 +161,7 @@ public interface RMNode { * @return labels in this node */ public Set<String> getNodeLabels(); - - /** - * Update containers to be decreased - */ - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response); - + public List<Container> pullNewlyIncreasedContainers(); long getUntrackedTimeStamp(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 830a6a9..eca961b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -559,7 +559,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { }; @Override - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { this.writeLock.lock(); try { @@ -573,6 +574,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { this.finishedApplications.clear(); this.containersToSignal.clear(); this.containersToBeRemovedFromNM.clear(); + + // NOTE: This is required for backward compatibility. + response.addAllContainersToDecrease(toBeDecreasedContainers.values()); + toBeDecreasedContainers.clear(); + + // Synchronously update the last response in rmNode with updated + // responseId + this.latestNodeHeartBeatResponse = response; } finally { this.writeLock.unlock(); } @@ -582,25 +591,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { public Collection<Container> getToBeDecreasedContainers() { return toBeDecreasedContainers.values(); } - - @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - this.writeLock.lock(); - - try { - response.addAllContainersToDecrease(toBeDecreasedContainers.values()); - toBeDecreasedContainers.clear(); - } finally { - this.writeLock.unlock(); - } - } @Override public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { - this.readLock.lock(); - try { return this.latestNodeHeartBeatResponse; } finally { @@ -848,21 +842,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - default: - LOG.debug("Unexpected Rmnode state"); - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } } else { @@ -1118,10 +1097,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; - - // Switch the last heartbeatresponse. - rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); - NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); @@ -1190,9 +1165,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { @Override public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event; - - // Switch the last heartbeatresponse. - rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index ba6ac9b..411c0f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -27,27 +27,22 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; public class RMNodeStatusEvent extends RMNodeEvent { private final NodeStatus nodeStatus; - private final NodeHeartbeatResponse latestResponse; private List<LogAggregationReport> logAggregationReportsForApps; - public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, - NodeHeartbeatResponse latestResponse) { - this(nodeId, nodeStatus, latestResponse, null); + public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) { + this(nodeId, nodeStatus, null); } public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, - NodeHeartbeatResponse latestResponse, List<LogAggregationReport> logAggregationReportsForApps) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeStatus = nodeStatus; - this.latestResponse = latestResponse; this.logAggregationReportsForApps = logAggregationReportsForApps; } @@ -59,10 +54,6 @@ public class RMNodeStatusEvent extends RMNodeEvent { return this.nodeStatus.getContainersStatuses(); } - public NodeHeartbeatResponse getLatestResponse() { - return this.latestResponse; - } - public List<ApplicationId> getKeepAliveAppIds() { return this.nodeStatus.getKeepAliveApplications(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e95ffe..107c224 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -116,7 +116,7 @@ public class MockNM { container.getResource()); List<Container> increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, ++responseId); + true, responseId); } public RegisterNodeManagerResponse registerNode() throws Exception { @@ -161,12 +161,13 @@ public class MockNM { memory = newResource.getMemory(); vCores = newResource.getVirtualCores(); } + responseId = 0; return registrationResponse; } public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.<ContainerStatus>emptyList(), - Collections.<Container>emptyList(), isHealthy, ++responseId); + Collections.<Container>emptyList(), isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -179,12 +180,12 @@ public class MockNM { containerStatusList.add(containerStatus); Log.info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.<Container>emptyList(), true, ++responseId); + Collections.<Container>emptyList(), true, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, List<ContainerStatus>> conts, boolean isHealthy) throws Exception { - return nodeHeartbeat(conts, isHealthy, ++responseId); + return nodeHeartbeat(conts, isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, @@ -227,7 +228,8 @@ public class MockNM { req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); - + responseId = heartbeatResponse.getResponseId(); + MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey(); if (masterKeyFromRM != null && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey @@ -262,4 +264,8 @@ public class MockNM { public String getVersion() { return version; } + + public void setResponseId(int id) { + this.responseId = id; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 83e901d..4bfb189 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -200,7 +200,8 @@ public class MockNodes { } @Override - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { } @Override @@ -241,12 +242,6 @@ public class MockNodes { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - - } - - @Override public List<Container> pullNewlyIncreasedContainers() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 5a462ea..67920a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -158,15 +158,12 @@ public class TestRMNodeTransitions { private RMNodeStatusEvent getMockRMNodeStatusEvent( List<ContainerStatus> containerStatus) { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); if (containerStatus != null) { doReturn(containerStatus).when(event).getContainers(); @@ -175,15 +172,12 @@ public class TestRMNodeTransitions { } private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(getAppIdList()).when(event).getKeepAliveAppIds(); return event; @@ -196,15 +190,12 @@ public class TestRMNodeTransitions { } private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(null).when(event).getKeepAliveAppIds(); return event; @@ -651,7 +642,7 @@ public class TestRMNodeTransitions { Assert.assertEquals(1, node.getContainersToCleanUp().size()); Assert.assertEquals(1, node.getAppsToCleanup().size()); NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class); - node.updateNodeHeartbeatResponseForCleanup(hbrsp); + node.setAndUpdateNodeHeartbeatResponse(hbrsp); Assert.assertEquals(0, node.getContainersToCleanUp().size()); Assert.assertEquals(0, node.getAppsToCleanup().size()); Assert.assertEquals(1, hbrsp.getContainersToCleanup().size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index dbcbe30..14b6dbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -706,7 +706,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Records.newRecord(NodeHeartbeatRequest.class); heartbeatReq.setNodeLabels(null); // Node heartbeat label update nodeStatusObject = getNodeStatusObject(nodeId); - nodeStatusObject.setResponseId(responseId+2); + nodeStatusObject.setResponseId(responseId+1); heartbeatReq.setNodeStatus(nodeStatusObject); heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse .getNMTokenMasterKey()); @@ -1908,4 +1908,31 @@ public class TestResourceTrackerService extends NodeLabelTestBase { DefaultMetricsSystem.shutdown(); } } + + @Test + public void testResponseIdOverflow() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + + // prepare the responseId that's about to overflow + RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE); + + nm1.setResponseId(Integer.MAX_VALUE); + + // heartbeat twice and check responseId + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(0, nodeHeartbeat.getResponseId()); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(1, nodeHeartbeat.getResponseId()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c98963b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 0c1777b..4b110c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -167,7 +167,7 @@ public class TestRMAppLogAggregationStatus { NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0, new ArrayList<ContainerStatus>(), null, NodeHealthStatus.newInstance(true, null, 0), null, null, null); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp)); List<LogAggregationReport> node2ReportForApp = @@ -181,7 +181,7 @@ public class TestRMAppLogAggregationStatus { NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0, new ArrayList<ContainerStatus>(), null, NodeHealthStatus.newInstance(true, null, 0), null, null, null); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp)); // node1 and node2 has updated its log aggregation status // verify that the log aggregation status for node1, node2 @@ -218,7 +218,7 @@ public class TestRMAppLogAggregationStatus { LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode1_2); node1ReportForApp2.add(report1_2); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp2)); // verify that the log aggregation status for node1 @@ -286,7 +286,7 @@ public class TestRMAppLogAggregationStatus { LogAggregationStatus.SUCCEEDED, "")); // For every logAggregationReport cached in memory, we can only save at most // 10 diagnostic messages/failure messages - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp3)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); @@ -330,7 +330,7 @@ public class TestRMAppLogAggregationStatus { LogAggregationStatus.FAILED, ""); node2ReportForApp2.add(report2_2); node2ReportForApp2.add(report2_3); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp2)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
