This is an automated email from the ASF dual-hosted git repository. epayne pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 9176e8f YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger via eyang 9176e8f is described below commit 9176e8fe5db97c9690f649373a4cf8c7b04365b8 Author: Eric Yang <ey...@apache.org> AuthorDate: Tue Jun 30 11:39:16 2020 -0700 YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger via eyang (cherry picked from commit e8dc862d3856e9eaea124c625dade36f1dd53fe2) --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 7 +++ .../src/main/resources/yarn-default.xml | 7 +++ .../RegisterNodeManagerRequest.java | 19 ++++++- .../impl/pb/RegisterNodeManagerRequestPBImpl.java | 39 ++++++++++++- .../proto/yarn_server_common_service_protos.proto | 1 + .../server/nodemanager/NodeStatusUpdaterImpl.java | 3 +- .../nodemanager/health/NodeHealthScriptRunner.java | 11 +++- .../health/TimedHealthReporterService.java | 20 ++++++- .../yarn/server/nodemanager/TestEventFlow.java | 5 ++ .../containermanager/BaseContainerManagerTest.java | 66 +++++++++++++--------- .../containermanager/TestContainerManager.java | 6 +- .../nodemanager/containermanager/TestNMProxy.java | 4 +- .../scheduler/TestContainerSchedulerQueuing.java | 2 +- .../resourcemanager/ResourceTrackerService.java | 5 +- .../server/resourcemanager/rmnode/RMNodeImpl.java | 58 +++++++++++++++---- .../resourcemanager/rmnode/RMNodeStartedEvent.java | 10 +++- .../hadoop/yarn/server/resourcemanager/MockNM.java | 22 ++++++++ .../hadoop/yarn/server/resourcemanager/MockRM.java | 7 ++- .../yarn/server/resourcemanager/NodeManager.java | 3 +- .../resourcemanager/TestRMNodeTransitions.java | 55 +++++++++++++++--- .../resourcemanager/TestResourceManager.java | 29 ++++++---- .../TestResourceTrackerService.java | 6 ++ .../TestRMAppLogAggregationStatus.java | 7 ++- .../resourcetracker/TestNMExpiry.java | 7 +++ .../resourcetracker/TestNMReconnect.java | 7 +++ .../scheduler/TestAbstractYarnScheduler.java | 5 ++ .../scheduler/TestSchedulerHealth.java | 18 ++++-- .../scheduler/capacity/TestCapacityScheduler.java | 63 ++++++++++++++------- .../scheduler/fair/TestFairScheduler.java | 21 +++++-- .../scheduler/fifo/TestFifoScheduler.java | 25 +++++--- .../webapp/TestRMWebServicesNodes.java | 5 +- 31 files changed, 429 insertions(+), 114 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 25c2dd6..7ae5760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2021,6 +2021,13 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; + /** Whether or not to run the node health script before the NM + * starts up.*/ + public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + NM_PREFIX + "health-checker.run-before-startup"; + public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + false; + /** Health check time out period for all scripts.*/ public static final String NM_HEALTH_CHECK_TIMEOUT_MS = NM_PREFIX + "health-checker.timeout-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 48164b3..91a791e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1679,6 +1679,13 @@ </property> <property> + <description>Whether or not to run the node health script + before the NM starts up.</description> + <name>yarn.nodemanager.health-checker.run-before-startup</name> + <value>false</value> + </property> + + <property> <description>Frequency of running node health scripts.</description> <name>yarn.nodemanager.health-checker.interval-ms</name> <value>600000</value> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index acec16f..54b3915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { @@ -53,14 +54,15 @@ public abstract class RegisterNodeManagerRequest { Resource physicalResource) { return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, containerStatuses, runningApplications, nodeLabels, physicalResource, - null); + null, null); } public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List<NMContainerStatus> containerStatuses, List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels, - Resource physicalResource, Set<NodeAttribute> nodeAttributes) { + Resource physicalResource, Set<NodeAttribute> nodeAttributes, + NodeStatus nodeStatus) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -72,6 +74,7 @@ public abstract class RegisterNodeManagerRequest { request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); request.setNodeAttributes(nodeAttributes); + request.setNodeStatus(nodeStatus); return request; } @@ -133,4 +136,16 @@ public abstract class RegisterNodeManagerRequest { public abstract Set<NodeAttribute> getNodeAttributes(); public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes); + + /** + * Get the status of the node. + * @return The status of the node. + */ + public abstract NodeStatus getNodeStatus(); + + /** + * Set the status of the node. + * @param nodeStatus The status of the node. + */ + public abstract void setNodeStatus(NodeStatus nodeStatus); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 317f8ab..d91cff2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; @@ -51,7 +52,9 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeMa import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; - +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; + public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto.Builder builder = null; @@ -68,6 +71,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest /** Physical resources in the node. */ private Resource physicalResource = null; + private NodeStatus nodeStatus; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -121,6 +125,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } + if (this.nodeStatus != null) { + builder.setNodeStatus(convertToProtoFormat(this.nodeStatus)); + } } private void addLogAggregationStatusForAppsToProto() { @@ -360,6 +367,28 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } @Override + public synchronized NodeStatus getNodeStatus() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.nodeStatus != null) { + return this.nodeStatus; + } + if (!p.hasNodeStatus()) { + return null; + } + this.nodeStatus = convertFromProtoFormat(p.getNodeStatus()); + return this.nodeStatus; + } + + @Override + public synchronized void setNodeStatus(NodeStatus pNodeStatus) { + maybeInitBuilder(); + if (pNodeStatus == null) { + builder.clearNodeStatus(); + } + this.nodeStatus = pNodeStatus; + } + + @Override public int hashCode() { return getProto().hashCode(); } @@ -533,4 +562,12 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) { + return new NodeStatusPBImpl(s); + } + + private NodeStatusProto convertToProtoFormat(NodeStatus s) { + return ((NodeStatusPBImpl)s).getProto(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index ff7153e..c643179 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -74,6 +74,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto physicalResource = 9; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; optional NodeAttributesProto nodeAttributes = 11; + optional NodeStatusProto nodeStatus = 12; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index dcfe83c..18a7e23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -392,10 +392,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // during RM recovery synchronized (this.context) { List<NMContainerStatus> containerReports = getNMContainerStatuses(); + NodeStatus nodeStatus = getNodeStatus(0); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource, nodeAttributes); + nodeLabels, physicalResource, nodeAttributes, nodeStatus); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java index 1c9bd82..af92b15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java @@ -60,8 +60,9 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService { "Node health script timed out"; private NodeHealthScriptRunner(String scriptName, long checkInterval, - long timeout, String[] scriptArgs) { - super(NodeHealthScriptRunner.class.getName(), checkInterval); + long timeout, String[] scriptArgs, boolean runBeforeStartup) { + super(NodeHealthScriptRunner.class.getName(), checkInterval, + runBeforeStartup); this.nodeHealthScript = scriptName; this.scriptTimeout = timeout; setTimerTask(new NodeHealthMonitorExecutor(scriptArgs)); @@ -91,6 +92,10 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService { "interval-ms can not be set to a negative number."); } + boolean runBeforeStartup = conf.getBoolean( + YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP, + YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP); + // Determine time out String scriptTimeoutConfig = String.format( YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE, @@ -113,7 +118,7 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService { String[] scriptArgs = conf.getStrings(scriptArgsConfig, new String[]{}); return new NodeHealthScriptRunner(nodeHealthScript, - checkIntervalMs, scriptTimeout, scriptArgs); + checkIntervalMs, scriptTimeout, scriptArgs, runBeforeStartup); } private enum HealthCheckerExitStatus { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java index a0c4d8b..6a7a291 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java @@ -45,6 +45,7 @@ public abstract class TimedHealthReporterService extends AbstractService private Timer timer; private TimerTask task; private long intervalMs; + private boolean runBeforeStartup; TimedHealthReporterService(String name, long intervalMs) { super(name); @@ -52,6 +53,17 @@ public abstract class TimedHealthReporterService extends AbstractService this.healthReport = ""; this.lastReportedTime = System.currentTimeMillis(); this.intervalMs = intervalMs; + this.runBeforeStartup = false; + } + + TimedHealthReporterService(String name, long intervalMs, + boolean runBeforeStartup) { + super(name); + this.isHealthy = true; + this.healthReport = ""; + this.lastReportedTime = System.currentTimeMillis(); + this.intervalMs = intervalMs; + this.runBeforeStartup = runBeforeStartup; } @VisibleForTesting @@ -73,7 +85,13 @@ public abstract class TimedHealthReporterService extends AbstractService throw new Exception("Health reporting task hasn't been set!"); } timer = new Timer("HealthReporterService-Timer", true); - timer.scheduleAtFixedRate(task, 0, intervalMs); + long delay = 0; + if (runBeforeStartup) { + delay = intervalMs; + task.run(); + } + + timer.scheduleAtFixedRate(task, delay, intervalMs); super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index b1fc2f1..3f4879b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.mockito.Mockito.mock; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -134,6 +136,9 @@ public class TestEventFlow { new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); nodeStatusUpdater.init(conf); + NodeResourceMonitorImpl nodeResourceMonitor = mock( + NodeResourceMonitorImpl.class); + ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor); ((NMContext)context).setContainerManager(containerManager); nodeStatusUpdater.start(); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 7a85bfa..9ee3ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doNothing; +import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,32 +157,20 @@ public abstract class BaseContainerManagerTest { protected NodeHealthCheckerService nodeHealthChecker; protected LocalDirsHandlerService dirsHandler; protected final long DUMMY_RM_IDENTIFIER = 1234; + private NodeResourceMonitorImpl nodeResourceMonitor = mock( + NodeResourceMonitorImpl.class); + private NodeHealthCheckerService nodeHealthCheckerService; + private NodeStatusUpdater nodeStatusUpdater; + protected ContainerManagerImpl containerManager = null; - protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { - @Override - protected ResourceTracker getRMClient() { - return new LocalRMInterface(); - }; - - @Override - protected void stopRMProxy() { - return; - } - - @Override - protected void startStatusUpdater() { - return; // Don't start any updating thread. - } - - @Override - public long getRMIdentifier() { - // There is no real RM registration, simulate and set RMIdentifier - return DUMMY_RM_IDENTIFIER; - } - }; + public NodeStatusUpdater getNodeStatusUpdater() { + return nodeStatusUpdater; + } - protected ContainerManagerImpl containerManager = null; + public void setNodeStatusUpdater( + NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } protected ContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor(); @@ -218,11 +207,36 @@ public abstract class BaseContainerManagerTest { delSrvc.init(conf); dirsHandler = new LocalDirsHandlerService(); - nodeHealthChecker = new NodeHealthCheckerService(dirsHandler); - nodeHealthChecker.init(conf); + dirsHandler.init(conf); + nodeHealthCheckerService = new NodeHealthCheckerService(dirsHandler); + nodeStatusUpdater = new NodeStatusUpdaterImpl( + context, new AsyncDispatcher(), nodeHealthCheckerService, metrics) { + @Override + protected ResourceTracker getRMClient() { + return new LocalRMInterface(); + }; + + @Override + protected void stopRMProxy() { + return; + } + + @Override + protected void startStatusUpdater() { + return; // Don't start any updating thread. + } + + @Override + public long getRMIdentifier() { + // There is no real RM registration, simulate and set RMIdentifier + return DUMMY_RM_IDENTIFIER; + } + }; + containerManager = createContainerManager(delSrvc); ((NMContext)context).setContainerManager(containerManager); ((NMContext)context).setContainerExecutor(exec); + ((NMContext)context).setNodeResourceMonitor(nodeResourceMonitor); nodeStatusUpdater.init(conf); containerManager.init(conf); nodeStatusUpdater.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 41e5f1f..05014e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -200,8 +200,8 @@ public class TestContainerManager extends BaseContainerManagerTest { @Override protected ContainerManagerImpl createContainerManager(DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, dirsHandler) { + return new ContainerManagerImpl(context, exec, delSrvc, + getNodeStatusUpdater(), metrics, dirsHandler) { @Override protected UserGroupInformation getRemoteUgi() throws YarnException { @@ -1744,7 +1744,7 @@ public class TestContainerManager extends BaseContainerManagerTest { @Test public void testNullTokens() throws Exception { ContainerManagerImpl cMgrImpl = - new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, + new ContainerManagerImpl(context, exec, delSrvc, getNodeStatusUpdater(), metrics, dirsHandler); String strExceptionMsg = ""; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java index 5f023f0..32ff572 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java @@ -65,8 +65,8 @@ public class TestNMProxy extends BaseContainerManagerTest { @Override protected ContainerManagerImpl createContainerManager(DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, dirsHandler) { + return new ContainerManagerImpl(context, exec, delSrvc, + getNodeStatusUpdater(), metrics, dirsHandler) { @Override public StartContainersResponse startContainers( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index b21850c..508b8bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -131,7 +131,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { protected ContainerManagerImpl createContainerManager( DeletionService delSrvc) { return new ContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { + getNodeStatusUpdater(), metrics, dirsHandler) { @Override protected UserGroupInformation getRemoteUgi() throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 2c89ddd..7d6feea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -335,6 +335,7 @@ public class ResourceTrackerService extends AbstractService implements Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); Resource physicalResource = request.getPhysicalResource(); + NodeStatus nodeStatus = request.getNodeStatus(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -426,7 +427,7 @@ public class ResourceTrackerService extends AbstractService implements if (oldNode == null) { RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications()); + request.getRunningApplications(), nodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { if (LOG.isDebugEnabled()) { @@ -462,7 +463,7 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeStartedEvent(nodeId, null, null)); + .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus)); } else { // Reset heartbeat ID since node just restarted. oldNode.resetLastNodeHeartBeatResponse(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index f32a863..daf03e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.collections.keyvalue.DefaultMapEntry; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -208,7 +209,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { RMNodeEventType, RMNodeEvent>(NodeState.NEW) //Transitions from NEW state - .addTransition(NodeState.NEW, NodeState.RUNNING, + .addTransition(NodeState.NEW, + EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STARTED, new AddNodeTransition()) .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, @@ -704,7 +706,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private void updateMetricsForRejoinedNode(NodeState previousNodeState) { ClusterMetrics metrics = ClusterMetrics.getMetrics(); - metrics.incrNumActiveNodes(); switch (previousNodeState) { case LOST: @@ -847,10 +848,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } public static class AddNodeTransition implements - SingleArcTransition<RMNodeImpl, RMNodeEvent> { + MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> { @Override - public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List<NMContainerStatus> containers = null; @@ -868,8 +869,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { if (previousRMNode != null) { ClusterMetrics.getMetrics().decrDecommisionedNMs(); } - // Increment activeNodes explicitly because this is a new node. - ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { @@ -886,17 +885,37 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } } - rmNode.context.getDispatcher().getEventHandler() - .handle(new NodeAddedSchedulerEvent(rmNode, containers)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); + NodeState nodeState; + NodeStatus nodeStatus = + startEvent.getNodeStatus(); + + if (nodeStatus == null) { + nodeState = NodeState.RUNNING; + reportNodeRunning(rmNode, containers); + } else { + RMNodeStatusEvent rmNodeStatusEvent = + new RMNodeStatusEvent(nodeId, nodeStatus); + + NodeHealthStatus nodeHealthStatus = + updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent); + + if (nodeHealthStatus.getIsNodeHealthy()) { + nodeState = NodeState.RUNNING; + reportNodeRunning(rmNode, containers); + } else { + nodeState = NodeState.UNHEALTHY; + reportNodeUnusable(rmNode, nodeState); + } + } + List<LogAggregationReport> logAggregationReportsForApps = startEvent.getLogAggregationReportsForApps(); if (logAggregationReportsForApps != null && !logAggregationReportsForApps.isEmpty()) { rmNode.handleLogAggregationStatus(logAggregationReportsForApps); } + + return nodeState; } } @@ -1108,6 +1127,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } /** + * Report node is RUNNING. + * @param rmNode + * @param containers + */ + public static void reportNodeRunning(RMNodeImpl rmNode, + List<NMContainerStatus> containers) { + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + // Increment activeNodes explicitly because this is a new node. + ClusterMetrics.getMetrics().incrNumActiveNodes(); + } + + /** * Report node is UNUSABLE and update metrics. * @param rmNode * @param finalState @@ -1299,6 +1334,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { // notifiers get update metadata because they will very likely query it // upon notification // Update metrics + ClusterMetrics.getMetrics().incrNumActiveNodes(); rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY); return NodeState.RUNNING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 3976994..2bf04d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -24,19 +24,23 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; public class RMNodeStartedEvent extends RMNodeEvent { + private final NodeStatus nodeStatus; private List<NMContainerStatus> containerStatuses; private List<ApplicationId> runningApplications; private List<LogAggregationReport> logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports, - List<ApplicationId> runningApplications) { + List<ApplicationId> runningApplications, + NodeStatus nodeStatus) { super(nodeId, RMNodeEventType.STARTED); this.containerStatuses = containerReports; this.runningApplications = runningApplications; + this.nodeStatus = nodeStatus; } public List<NMContainerStatus> getNMContainerStatuses() { @@ -47,6 +51,10 @@ public class RMNodeStartedEvent extends RMNodeEvent { return runningApplications; } + public NodeStatus getNodeStatus() { + return nodeStatus; + } + public List<LogAggregationReport> getLogAggregationReportsForApps() { return this.logAggregationReportsForApps; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 3543bc4..d433753 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -187,6 +190,17 @@ public class MockNM { req.setNodeLabels(nodeLabels); } + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setResponseId(0); + status.setNodeId(nodeId); + status.setContainersStatuses(new ArrayList<>(containerStats.values())); + NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); + healthStatus.setHealthReport(""); + healthStatus.setIsNodeHealthy(true); + healthStatus.setLastHealthReportTime(1); + status.setNodeHealthStatus(healthStatus); + req.setNodeStatus(status); + RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = @@ -364,6 +378,14 @@ public class MockNM { return heartbeatResponse; } + public static NodeStatus createMockNodeStatus() { + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + return mockNodeStatus; + } + public long getMemory() { return capability.getMemorySize(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index b3888c3..90c5543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.token.Token; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -543,7 +546,9 @@ public class MockRM extends ResourceManager { public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null, + mockNodeStatus)); drainEventsImplicitly(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 1e4b050..06c4527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -98,7 +98,7 @@ public class NodeManager implements ContainerManagementProtocol { public NodeManager(String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability, - ResourceManager resourceManager) + ResourceManager resourceManager, NodeStatus nodestatus) throws IOException, YarnException { this.containerManagerAddress = hostName + ":" + containerManagerPort; this.nodeHttpAddress = hostName + ":" + httpPort; @@ -113,6 +113,7 @@ public class NodeManager implements ContainerManagementProtocol { request.setResource(capability); request.setNodeId(this.nodeId); request.setNMVersion(YarnVersionInfo.getVersion()); + request.setNodeStatus(nodestatus); resourceTrackerService.registerNodeManager(request); this.resourceManager = resourceManager; resourceManager.getResourceScheduler().getNodeReport(this.nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 205b582..931a2e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -218,8 +219,9 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testExpiredContainer() { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -282,12 +284,13 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ + NodeStatus mockNodeStatus = createMockNodeStatus(); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeStartedEvent(null, null, null)); + node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); @@ -343,8 +346,9 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testStatusChange(){ + NodeStatus mockNodeStatus = createMockNodeStatus(); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); //Add info to the queue first node.setNextHeartBeat(false); @@ -610,6 +614,33 @@ public class TestRMNodeTransitions { } @Test + public void testAddUnhealthyNode() { + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + + NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + new ArrayList<>(), null, status, null, null, null); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + nodeStatus)); + + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy + 1, cm.getUnhealthyNMs()); + Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); + } + + @Test public void testNMShutdown() { RMNodeImpl node = getRunningNode(); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.SHUTDOWN)); @@ -714,7 +745,9 @@ public class TestRMNodeTransitions { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, capability, nmVersion); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -765,7 +798,10 @@ public class TestRMNodeTransitions { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, capability, null); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING)); Assert.assertEquals(NodeState.REBOOTED, node.getState()); @@ -781,7 +817,9 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -1080,8 +1118,9 @@ public class TestRMNodeTransitions { @Test public void testForHandlingDuplicatedCompltedContainers() { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); // Add info to the queue first node.setNextHeartBeat(false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 411b848..1cb5e1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -88,12 +90,12 @@ public class TestResourceManager { private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) throws IOException, - YarnException { + String rackName, Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -109,26 +111,30 @@ public class TestResourceManager { final int memory = 4 * 1024; final int vcores = 4; - + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host1 = "host1"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, vcores)); + Resources.createResource(memory, vcores), mockNodeStatus); // Register node2 String host2 = "host2"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory/2, vcores/2)); + Resources.createResource(memory/2, vcores/2), mockNodeStatus); // nodes should be in RUNNING state RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm1.getNodeId()); RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm2.getNodeId()); - node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null)); - node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null)); + node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null, + mockNodeStatus)); + node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null, + mockNodeStatus)); // Submit an application Application application = new Application("user1", resourceManager); @@ -216,9 +222,12 @@ public class TestResourceManager { public void testNodeHealthReportIsNotNull() throws Exception{ String host1 = "host1"; final int memory = 4 * 1024; + + NodeStatus mockNodeStatus = createMockNodeStatus(); + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = - registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, 1)); + registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory, 1), mockNodeStatus); nm1.heartbeat(); nm1.heartbeat(); Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 6690339..066e394 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; + +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -2712,10 +2714,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase { RegisterNodeManagerRequest.class); NodeId nodeId = NodeId.newInstance("host2", 1234); Resource capability = BuilderUtils.newResource(1024, 1); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + req.setResource(capability); req.setNodeId(nodeId); req.setHttpPort(1234); req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeStatus(mockNodeStatus); ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 8d31fe1..6836288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -139,13 +140,15 @@ public class TestRMAppLogAggregationStatus { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node1 = new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null); - node1.handle(new RMNodeStartedEvent(nodeId1, null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1)); NodeId nodeId2 = NodeId.newInstance("localhost", 2345); RMNodeImpl node2 = new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null); - node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null)); + node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null, + mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2)); // The initial log aggregation status for these two nodes diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index f69faf4..017a1e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.junit.Assert; import org.slf4j.Logger; @@ -135,12 +138,15 @@ public class TestNMExpiry { String hostname3 = "localhost3"; Resource capability = BuilderUtils.newResource(1024, 1); + NodeStatus mockNodeStatus = createMockNodeStatus(); + RegisterNodeManagerRequest request1 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request1); RegisterNodeManagerRequest request2 = recordFactory @@ -149,6 +155,7 @@ public class TestNMExpiry { request2.setNodeId(nodeId2); request2.setHttpPort(0); request2.setResource(capability); + request2.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request2); int waitCount = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 3c4e6b4..817fb9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; @@ -178,9 +181,13 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { RegisterNodeManagerRequest request1 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request1); Assert.assertNotNull(context.getRMNodes().get(nodeId1)); // verify Scheduler and RMContext use same RMNode reference. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index e67deb5..2860335 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -1051,9 +1053,12 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { RegisterNodeManagerRequest request1 = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + NodeStatus mockNodeStatus = createMockNodeStatus(); + request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); privateResourceTrackerService.registerNodeManager(request1); privateDispatcher.await(); Resource clusterResource = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index 83a354d..a75be77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -43,6 +44,7 @@ import org.junit.Test; import java.io.IOException; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assume.assumeTrue; public class TestSchedulerHealth { @@ -170,11 +172,11 @@ public class TestSchedulerHealth { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, Resource capability) throws IOException, - YarnException { + int httpPort, String rackName, Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, rackName, - capability, resourceManager); + capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -200,11 +202,13 @@ public class TestSchedulerHealth { assumeTrue("This test is only supported on Capacity Scheduler", isCapacityScheduler); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + Resources.createResource(5 * 1024, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -275,15 +279,17 @@ public class TestSchedulerHealth { assumeTrue("This test is only supported on Capacity Scheduler", isCapacityScheduler); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodes String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * 1024, 1)); + Resources.createResource(2 * 1024, 1), mockNodeStatus); String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + Resources.createResource(5 * 1024, 1), mockNodeStatus); nodeUpdate(nm_0); nodeUpdate(nm_1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3c01395..a272851 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; @@ -54,6 +55,7 @@ import java.util.concurrent.CyclicBarrier; import com.google.common.collect.Sets; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -242,9 +244,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { private NodeManager registerNode(ResourceManager rm, String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { + Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, rm); + containerManagerPort, httpPort, rackName, capability, rm, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -286,11 +289,11 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, - Resource capability) - throws IOException, YarnException { + int httpPort, String rackName, + Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, - rackName, capability, resourceManager); + rackName, capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -303,17 +306,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { LOG.info("--- START: testCapacityScheduler ---"); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -443,11 +448,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { when(mC.getConfigurationProvider()).thenReturn( new LocalConfigurationProvider()); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host0 = "host_0"; NodeManager nm0 = registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + Resources.createResource(10 * GB, 10), mockNodeStatus); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -545,11 +552,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { when(mC.getConfigurationProvider()).thenReturn( new LocalConfigurationProvider()); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host0 = "host_0"; NodeManager nm0 = registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + Resources.createResource(10 * GB, 10), mockNodeStatus); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -2097,17 +2106,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2213,17 +2225,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2335,11 +2349,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(6 * GB, 1)); + Resources.createResource(6 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2383,17 +2399,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { public void testMoveAppQueueMetricsCheck() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -4594,9 +4612,12 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -4639,10 +4660,14 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ((CapacityScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e735af4..7882ba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -124,6 +125,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; import static org.junit.Assert.assertEquals; @@ -4863,9 +4865,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -4908,10 +4913,14 @@ public class TestFairScheduler extends FairSchedulerTestBase { ((FairScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -4950,11 +4959,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, - Resource capability) + int httpPort, String rackName, + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { + NodeStatus mockNodeStatus = createMockNodeStatus(); + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, - rackName, capability, resourceManager); + rackName, capability, resourceManager, mockNodeStatus); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 01fb6a7..9b3657e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -33,6 +34,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -143,10 +145,10 @@ public class TestFifoScheduler { private NodeManager registerNode(String hostName, int containerManagerPort, int nmHttpPort, String rackName, - Resource capability) + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, - nmHttpPort, rackName, capability, resourceManager); + nmHttpPort, rackName, capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -406,19 +408,21 @@ public class TestFifoScheduler { LOG.info("--- START: testFifoScheduler ---"); final int GB = 1024; - + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); nm_0.heartbeat(); // Register node2 String host_1 = "host_1"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); nm_1.heartbeat(); // ResourceRequest priorities @@ -1197,9 +1201,12 @@ public class TestFifoScheduler { @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -1242,10 +1249,14 @@ public class TestFifoScheduler { ((FifoScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index c3f41f6..dc028fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode; import static org.junit.Assert.assertEquals; @@ -241,8 +242,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase { } private void sendStartedEvent(RMNode node) { + NodeStatus mockNodeStatus = createMockNodeStatus(); ((RMNodeImpl) node) - .handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + .handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); } private void sendLostEvent(RMNode node) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org