YARN-8925. Updating distributed node attributes only when necessary. Contributed by Tao Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7deef08e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7deef08e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7deef08e Branch: refs/heads/branch-3.2 Commit: 7deef08eb8574ec5067620f3dcf08c444710de65 Parents: 9c89e2e Author: Weiwei Yang <[email protected]> Authored: Fri Dec 21 16:31:03 2018 +0800 Committer: Weiwei Yang <[email protected]> Committed: Fri Dec 21 16:31:03 2018 +0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../hadoop/yarn/nodelabels/NodeLabelUtil.java | 51 ++- .../src/main/resources/yarn-default.xml | 9 + .../yarn/nodelabels/TestNodeLabelUtil.java | 71 +++ .../protocolrecords/NodeHeartbeatResponse.java | 5 + .../RegisterNodeManagerRequest.java | 16 + .../RegisterNodeManagerResponse.java | 5 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 15 + .../pb/RegisterNodeManagerRequestPBImpl.java | 53 +++ .../pb/RegisterNodeManagerResponsePBImpl.java | 15 + .../yarn_server_common_service_protos.proto | 3 + .../nodemanager/NodeStatusUpdaterImpl.java | 331 ++++++++++---- .../TestNodeStatusUpdaterForAttributes.java | 439 ++++++++++++++++++ .../resourcemanager/ResourceTrackerService.java | 103 ++++- .../nodelabels/NodeAttributesManagerImpl.java | 13 +- .../TestResourceTrackerService.java | 456 ++++++++++++++++++- 16 files changed, 1479 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 95861d7..c5abec7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3596,6 +3596,12 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL = 2 * 60 * 1000; + public static final String NM_NODE_ATTRIBUTES_RESYNC_INTERVAL = + NM_NODE_ATTRIBUTES_PREFIX + "resync-interval-ms"; + + public static final long DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL = + 2 * 60 * 1000; + // If -1 is configured then no timer task should be created public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java index 395ff81..c313998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeKey; import java.io.IOException; +import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -40,6 +41,8 @@ public final class NodeLabelUtil { Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_\\.]*"); private static final Pattern ATTRIBUTE_VALUE_PATTERN = Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_.]*"); + private static final Pattern ATTRIBUTE_NAME_PATTERN = + Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*"); public static void checkAndThrowLabelName(String label) throws IOException { if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { @@ -57,6 +60,25 @@ public final class NodeLabelUtil { } } + public static void checkAndThrowAttributeName(String attributeName) + throws IOException { + if (attributeName == null || attributeName.isEmpty() + || attributeName.length() > MAX_LABEL_LENGTH) { + throw new IOException( + "attribute name added is empty or exceeds " + MAX_LABEL_LENGTH + + " character(s)"); + } + attributeName = attributeName.trim(); + + boolean match = ATTRIBUTE_NAME_PATTERN.matcher(attributeName).matches(); + + if (!match) { + throw new IOException("attribute name should only contains " + + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}" + + ", now it is= " + attributeName); + } + } + public static void checkAndThrowAttributeValue(String value) throws IOException { if (value == null) { @@ -129,7 +151,9 @@ public final class NodeLabelUtil { // Verify attribute prefix format. checkAndThrowAttributePrefix(prefix); // Verify attribute name format. - checkAndThrowLabelName(attributeKey.getAttributeName()); + checkAndThrowAttributeName(attributeKey.getAttributeName()); + // Verify attribute value format. + checkAndThrowAttributeValue(nodeAttribute.getAttributeValue()); } } } @@ -152,4 +176,29 @@ public final class NodeLabelUtil { .equals(nodeAttribute.getAttributeKey().getAttributePrefix())) .collect(Collectors.toSet()); } + + /** + * Are these two input node attributes the same. + * @return true if they are the same + */ + public static boolean isNodeAttributesEquals( + Set<NodeAttribute> leftNodeAttributes, + Set<NodeAttribute> rightNodeAttributes) { + if (leftNodeAttributes == null && rightNodeAttributes == null) { + return true; + } else if (leftNodeAttributes == null || rightNodeAttributes == null + || leftNodeAttributes.size() != rightNodeAttributes.size()) { + return false; + } + return leftNodeAttributes.stream() + .allMatch(e -> isNodeAttributeIncludes(rightNodeAttributes, e)); + } + + private static boolean isNodeAttributeIncludes( + Set<NodeAttribute> nodeAttributes, NodeAttribute checkNodeAttribute) { + return nodeAttributes.stream().anyMatch( + e -> e.equals(checkNodeAttribute) && Objects + .equals(e.getAttributeValue(), + checkNodeAttribute.getAttributeValue())); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ba5a96e..6abcb68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2978,6 +2978,15 @@ <property> <description> + Interval at which NM syncs its node attributes with RM. NM will send its loaded + attributes every x intervals configured, along with heartbeat to RM. + </description> + <name>yarn.nodemanager.node-attributes.resync-interval-ms</name> + <value>120000</value> + </property> + + <property> + <description> Timeout in seconds for YARN node graceful decommission. This is the maximal time to wait for running containers and applications to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java index afdfcbb..060e38d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.nodelabels; import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.junit.Assert; import org.junit.Test; /** @@ -48,4 +53,70 @@ public class TestNodeLabelUtil { } } } + + @Test + public void testIsNodeAttributesEquals() { + NodeAttribute nodeAttributeCK1V1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeCK1V1Copy = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK1V1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK1V1Copy = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK2V1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK2V2 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2", + NodeAttributeType.STRING, "V2"); + /* + * equals if set size equals and items are all the same + */ + Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(null, null)); + Assert.assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(), ImmutableSet.of())); + Assert.assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of(nodeAttributeCK1V1Copy))); + Assert.assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeDK1V1Copy))); + Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals( + ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeCK1V1Copy, nodeAttributeDK1V1Copy))); + /* + * not equals if set size not equals or items are different + */ + Assert.assertFalse( + NodeLabelUtil.isNodeAttributesEquals(null, ImmutableSet.of())); + Assert.assertFalse( + NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(), null)); + // different attribute prefix + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of(nodeAttributeDK1V1))); + // different attribute name + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeDK2V1))); + // different attribute value + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK2V1), + ImmutableSet.of(nodeAttributeDK2V2))); + // different set + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of())); + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1))); + Assert.assertFalse(NodeLabelUtil.isNodeAttributesEquals( + ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeDK1V1))); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c72..acb7644 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -118,4 +118,9 @@ public abstract class NodeHeartbeatResponse { public abstract void addAllContainersToDecrease( Collection<Container> containersToDecrease); + + public abstract boolean getAreNodeAttributesAcceptedByRM(); + + public abstract void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index ff50330..acec16f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; @@ -50,6 +51,16 @@ public abstract class RegisterNodeManagerRequest { List<NMContainerStatus> containerStatuses, List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels, Resource physicalResource) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, nodeLabels, physicalResource, + null); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List<NMContainerStatus> containerStatuses, + List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels, + Resource physicalResource, Set<NodeAttribute> nodeAttributes) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -60,6 +71,7 @@ public abstract class RegisterNodeManagerRequest { request.setRunningApplications(runningApplications); request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); + request.setNodeAttributes(nodeAttributes); return request; } @@ -117,4 +129,8 @@ public abstract class RegisterNodeManagerRequest { public abstract void setLogAggregationReportsForApps( List<LogAggregationReport> logAggregationReportsForApps); + + public abstract Set<NodeAttribute> getNodeAttributes(); + + public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index 675b375..d54a567 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -58,4 +58,9 @@ public abstract class RegisterNodeManagerResponse { public abstract void setAreNodeLabelsAcceptedByRM( boolean areNodeLabelsAcceptedByRM); + + public abstract boolean getAreNodeAttributesAcceptedByRM(); + + public abstract void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc..a53ea4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -788,6 +788,21 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } @Override + public boolean getAreNodeAttributesAcceptedByRM() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getAreNodeAttributesAcceptedByRM(); + } + + @Override + public void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM) { + maybeInitBuilder(); + this.builder + .setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM); + } + + @Override public List<SignalContainerRequest> getContainersToSignalList() { initContainersToSignal(); return this.containersToSignal; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 02fd20f..317f8ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -26,22 +26,26 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -58,6 +62,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private List<NMContainerStatus> containerStatuses = null; private List<ApplicationId> runningApplications = null; private Set<NodeLabel> labels = null; + private Set<NodeAttribute> attributes = null; private List<LogAggregationReport> logAggregationReportsForApps = null; @@ -101,6 +106,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } builder.setNodeLabels(newBuilder.build()); } + if (this.attributes != null) { + builder.clearNodeAttributes(); + NodeAttributesProto.Builder attributesBuilder = + NodeAttributesProto.newBuilder(); + for (NodeAttribute attribute : attributes) { + attributesBuilder.addNodeAttributes(convertToProtoFormat(attribute)); + } + builder.setNodeAttributes(attributesBuilder.build()); + } if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } @@ -404,6 +418,36 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } } + @Override + public synchronized Set<NodeAttribute> getNodeAttributes() { + initNodeAttributes(); + return this.attributes; + } + + @Override + public synchronized void setNodeAttributes( + Set<NodeAttribute> nodeAttributes) { + maybeInitBuilder(); + builder.clearNodeAttributes(); + this.attributes = nodeAttributes; + } + + private synchronized void initNodeAttributes() { + if (this.attributes != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeAttributes()) { + attributes=null; + return; + } + NodeAttributesProto nodeAttributes = p.getNodeAttributes(); + attributes = new HashSet<>(); + for(NodeAttributeProto nap : nodeAttributes.getNodeAttributesList()) { + attributes.add(convertFromProtoFormat(nap)); + } + } + private static NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) { return new NodeLabelPBImpl(p); } @@ -412,6 +456,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest return ((NodeLabelPBImpl)t).getProto(); } + private static NodeAttributePBImpl convertFromProtoFormat( + NodeAttributeProto p) { + return new NodeAttributePBImpl(p); + } + + private static NodeAttributeProto convertToProtoFormat(NodeAttribute t) { + return ((NodeAttributePBImpl)t).getProto(); + } + private static ApplicationIdPBImpl convertFromProtoFormat( ApplicationIdProto p) { return new ApplicationIdPBImpl(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index 6321309..4e4ca3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -269,4 +269,19 @@ public class RegisterNodeManagerResponsePBImpl maybeInitBuilder(); this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); } + + @Override + public boolean getAreNodeAttributesAcceptedByRM() { + RegisterNodeManagerResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getAreNodeAttributesAcceptedByRM(); + } + + @Override + public void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM) { + maybeInitBuilder(); + this.builder + .setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a3..fb74237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -72,6 +72,7 @@ message RegisterNodeManagerRequestProto { optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; + optional NodeAttributesProto nodeAttributes = 11; } message RegisterNodeManagerResponseProto { @@ -83,6 +84,7 @@ message RegisterNodeManagerResponseProto { optional string rm_version = 6; optional bool areNodeLabelsAcceptedByRM = 7 [default = false]; optional ResourceProto resource = 8; + optional bool areNodeAttributesAcceptedByRM = 9 [default = false]; } message UnRegisterNodeManagerRequestProto { @@ -128,6 +130,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional bool areNodeAttributesAcceptedByRM = 18 [default = false]; } message ContainerQueuingLimitProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92..5975784 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -377,6 +378,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements throws YarnException, IOException { RegisterNodeManagerResponse regNMResponse; Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); + Set<NodeAttribute> nodeAttributes = + nodeAttributesHandler.getNodeAttributesForRegistration(); // Synchronize NM-RM registration with // ContainerManagerImpl#increaseContainersResource and @@ -387,7 +390,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource); + nodeLabels, physicalResource, nodeAttributes); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); @@ -473,6 +476,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements successfullRegistrationMsg.append(nodeLabelsHandler .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); + successfullRegistrationMsg.append(nodeAttributesHandler + .verifyRMRegistrationResponseForNodeAttributes(regNMResponse)); LOG.info(successfullRegistrationMsg.toString()); } @@ -875,34 +880,254 @@ public class NodeStatusUpdaterImpl extends AbstractService implements */ private NMNodeAttributesHandler createNMNodeAttributesHandler( NodeAttributesProvider provider) { - return provider == null ? null : - new NMDistributedNodeAttributesHandler(nodeAttributesProvider); + if (provider == null) { + return new NMCentralizedNodeAttributesHandler(); + } else { + return new NMDistributedNodeAttributesHandler(provider, this.getConfig()); + } + } + + private static abstract class CachedNodeDescriptorHandler<T> { + private final long resyncInterval; + private final T defaultValue; + private T previousValue; + private long lastSendMills = 0L; + private boolean isValueSented; + + CachedNodeDescriptorHandler(T defaultValue, + long resyncInterval) { + this.defaultValue = defaultValue; + this.resyncInterval = resyncInterval; + } + + public abstract T getValueFromProvider(); + + public T getValueForRegistration() { + T value = getValueFromProvider(); + if (defaultValue != null) { + value = (null == value) ? defaultValue : value; + } + previousValue = value; + try { + validate(value); + } catch (IOException e) { + value = null; + } + return value; + } + + public T getValueForHeartbeat() { + T value = getValueFromProvider(); + // if the provider returns null then consider default value are set + if (defaultValue != null) { + value = (null == value) ? defaultValue : value; + } + // take some action only on modification of value + boolean isValueUpdated = isValueUpdated(value); + + isValueSented = false; + // When value updated or resync time is elapsed will send again in + // heartbeat. + if (isValueUpdated || isResyncIntervalElapsed()) { + previousValue = value; + try { + validate(value); + isValueSented = true; + } catch (IOException e) { + // take previous value to replace invalid value, so that invalid + // value are not verified for every HB, and send empty set + // to RM to have same value which was earlier set. + value = null; + } finally { + // Set last send time in heartbeat + lastSendMills = System.currentTimeMillis(); + } + } else { + // if value have not changed then no need to send + value = null; + } + return value; + } + + /** + * This method checks resync interval is elapsed or not. + */ + public boolean isResyncIntervalElapsed() { + long elapsedTimeSinceLastSync = + System.currentTimeMillis() - lastSendMills; + if (elapsedTimeSinceLastSync > resyncInterval) { + return true; + } + return false; + } + + protected abstract void validate(T value) throws IOException; + + protected abstract boolean isValueUpdated(T value); + + public long getResyncInterval() { + return resyncInterval; + } + + public T getDefaultValue() { + return defaultValue; + } + + public T getPreviousValue() { + return previousValue; + } + + public long getLastSendMills() { + return lastSendMills; + } + + public boolean isValueSented() { + return isValueSented; + } } private interface NMNodeAttributesHandler { /** + * validates nodeAttributes From Provider and returns it to the caller. Also + * ensures that if provider returns null then empty set is considered + */ + Set<NodeAttribute> getNodeAttributesForRegistration(); + + /** * @return the node attributes of this node manager. */ Set<NodeAttribute> getNodeAttributesForHeartbeat(); + + /** + * @return RMRegistration Success message and on failure will log + * independently and returns empty string + */ + String verifyRMRegistrationResponseForNodeAttributes( + RegisterNodeManagerResponse regNMResponse); + + /** + * check whether if updated attributes sent to RM was accepted or not. + * @param response + */ + void verifyRMHeartbeatResponseForNodeAttributes( + NodeHeartbeatResponse response); + } + + + /** + * In centralized configuration, NM need not send Node attributes or process + * the response. + */ + private static class NMCentralizedNodeAttributesHandler + implements NMNodeAttributesHandler { + @Override + public Set<NodeAttribute> getNodeAttributesForHeartbeat() { + return null; + } + + @Override + public Set<NodeAttribute> getNodeAttributesForRegistration() { + return null; + } + + @Override + public void verifyRMHeartbeatResponseForNodeAttributes( + NodeHeartbeatResponse response) { + } + + @Override + public String verifyRMRegistrationResponseForNodeAttributes( + RegisterNodeManagerResponse regNMResponse) { + return ""; + } } private static class NMDistributedNodeAttributesHandler + extends CachedNodeDescriptorHandler<Set<NodeAttribute>> implements NMNodeAttributesHandler { private final NodeAttributesProvider attributesProvider; protected NMDistributedNodeAttributesHandler( - NodeAttributesProvider provider) { + NodeAttributesProvider provider, Configuration conf) { + super(Collections.unmodifiableSet(new HashSet<>(0)), + conf.getLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, + YarnConfiguration.DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL)); this.attributesProvider = provider; } @Override + public Set<NodeAttribute> getNodeAttributesForRegistration() { + return getValueForRegistration(); + } + + @Override public Set<NodeAttribute> getNodeAttributesForHeartbeat() { + return getValueForHeartbeat(); + } + + @Override + public Set<NodeAttribute> getValueFromProvider() { return attributesProvider.getDescriptors(); } - } + @Override + protected void validate(Set<NodeAttribute> nodeAttributes) + throws IOException { + try { + NodeLabelUtil.validateNodeAttributes(nodeAttributes); + } catch (IOException e) { + LOG.error( + "Invalid node attribute(s) from Provider : " + e.getMessage()); + throw e; + } + } + + @Override + protected boolean isValueUpdated(Set<NodeAttribute> value) { + return !NodeLabelUtil.isNodeAttributesEquals(getPreviousValue(), value); + } + + @Override + public String verifyRMRegistrationResponseForNodeAttributes( + RegisterNodeManagerResponse regNMResponse) { + StringBuilder successfulNodeAttributesRegistrationMsg = + new StringBuilder(); + if (regNMResponse.getAreNodeAttributesAcceptedByRM()) { + successfulNodeAttributesRegistrationMsg + .append(" and with following Node attribute(s) : {") + .append(getPreviousValue()).append("}"); + } else { + // case where provider is set but RM did not accept the node attributes + String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); + LOG.error("Node attributes sent from NM while registration were" + + " rejected by RM. " + ((errorMsgFromRM == null) ? + "Seems like RM is configured with Centralized Attributes." : + "And with message " + regNMResponse.getDiagnosticsMessage())); + } + return successfulNodeAttributesRegistrationMsg.toString(); + } + + @Override + public void verifyRMHeartbeatResponseForNodeAttributes( + NodeHeartbeatResponse response) { + if (isValueSented()) { + if (response.getAreNodeAttributesAcceptedByRM()) { + if(LOG.isDebugEnabled()){ + LOG.debug("Node attributes {" + getPreviousValue() + + "} were Accepted by RM "); + } + } else { + // case where updated node attributes from NodeAttributesProvider + // is sent to RM and RM rejected the attributes + LOG.error("NM node attributes {" + getPreviousValue() + + "} were not accepted by RM and message from RM : " + response + .getDiagnosticsMessage()); + } + } + } + } private static interface NMNodeLabelsHandler { /** @@ -963,33 +1188,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } private static class NMDistributedNodeLabelsHandler + extends CachedNodeDescriptorHandler<Set<NodeLabel>> implements NMNodeLabelsHandler { + private NMDistributedNodeLabelsHandler( NodeLabelsProvider nodeLabelsProvider, Configuration conf) { - this.nodeLabelsProvider = nodeLabelsProvider; - this.resyncInterval = + super(CommonNodeLabelsManager.EMPTY_NODELABEL_SET, conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, - YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL); + YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL)); + this.nodeLabelsProvider = nodeLabelsProvider; } private final NodeLabelsProvider nodeLabelsProvider; - private Set<NodeLabel> previousNodeLabels; - private boolean areLabelsSentToRM; - private long lastNodeLabelSendMills = 0L; - private final long resyncInterval; @Override public Set<NodeLabel> getNodeLabelsForRegistration() { - Set<NodeLabel> nodeLabels = nodeLabelsProvider.getDescriptors(); - nodeLabels = (null == nodeLabels) - ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels; - previousNodeLabels = nodeLabels; - try { - validateNodeLabels(nodeLabels); - } catch (IOException e) { - nodeLabels = null; - } - return nodeLabels; + return getValueForRegistration(); } @Override @@ -999,7 +1213,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { successfulNodeLabelsRegistrationMsg .append(" and with following Node label(s) : {") - .append(StringUtils.join(",", previousNodeLabels)).append("}"); + .append(StringUtils.join(",", getPreviousValue())).append("}"); } else { // case where provider is set but RM did not accept the Node Labels String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); @@ -1014,50 +1228,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override public Set<NodeLabel> getNodeLabelsForHeartbeat() { - Set<NodeLabel> nodeLabelsForHeartbeat = - nodeLabelsProvider.getDescriptors(); - // if the provider returns null then consider empty labels are set - nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null) - ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET - : nodeLabelsForHeartbeat; - // take some action only on modification of labels - boolean areNodeLabelsUpdated = - nodeLabelsForHeartbeat.size() != previousNodeLabels.size() - || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat); - - areLabelsSentToRM = false; - // When nodelabels elapsed or resync time is elapsed will send again in - // heartbeat. - if (areNodeLabelsUpdated || isResyncIntervalElapsed()) { - previousNodeLabels = nodeLabelsForHeartbeat; - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Labels from provider: " - + StringUtils.join(",", previousNodeLabels)); - } - validateNodeLabels(nodeLabelsForHeartbeat); - areLabelsSentToRM = true; - } catch (IOException e) { - // set previous node labels to invalid set, so that invalid - // labels are not verified for every HB, and send empty set - // to RM to have same nodeLabels which was earlier set. - nodeLabelsForHeartbeat = null; - } finally { - // Set last send time in heartbeat - lastNodeLabelSendMills = System.currentTimeMillis(); - } - } else { - // if nodelabels have not changed then no need to send - nodeLabelsForHeartbeat = null; - } - return nodeLabelsForHeartbeat; + return getValueForHeartbeat(); } - private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat) + protected void validate(Set<NodeLabel> nodeLabels) throws IOException { - Iterator<NodeLabel> iterator = nodeLabelsForHeartbeat.iterator(); + Iterator<NodeLabel> iterator = nodeLabels.iterator(); boolean hasInvalidLabel = false; - StringBuilder errorMsg = new StringBuilder(""); + StringBuilder errorMsg = new StringBuilder(); while (iterator.hasNext()) { try { NodeLabelUtil @@ -1074,33 +1252,31 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - /* - * This method checks resync interval is elapsed or not. - */ - public boolean isResyncIntervalElapsed() { - long elapsedTimeSinceLastSync = - System.currentTimeMillis() - lastNodeLabelSendMills; - if (elapsedTimeSinceLastSync > resyncInterval) { - return true; - } - return false; + @Override + public Set<NodeLabel> getValueFromProvider() { + return this.nodeLabelsProvider.getDescriptors(); + } + + @Override + protected boolean isValueUpdated(Set<NodeLabel> value) { + return !Objects.equals(value, getPreviousValue()); } @Override public void verifyRMHeartbeatResponseForNodeLabels( NodeHeartbeatResponse response) { - if (areLabelsSentToRM) { + if (isValueSented()) { if (response.getAreNodeLabelsAcceptedByRM()) { if(LOG.isDebugEnabled()){ LOG.debug( - "Node Labels {" + StringUtils.join(",", previousNodeLabels) + "Node Labels {" + StringUtils.join(",", getPreviousValue()) + "} were Accepted by RM "); } } else { // case where updated labels from NodeLabelsProvider is sent to RM and // RM rejected the labels LOG.error( - "NM node labels {" + StringUtils.join(",", previousNodeLabels) + "NM node labels {" + StringUtils.join(",", getPreviousValue()) + "} were not accepted by RM and message from RM : " + response.getDiagnosticsMessage()); } @@ -1120,7 +1296,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Set<NodeLabel> nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); Set<NodeAttribute> nodeAttributesForHeartbeat = - nodeAttributesHandler == null ? null : nodeAttributesHandler.getNodeAttributesForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeHeartbeatRequest request = @@ -1153,6 +1328,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (!handleShutdownOrResyncCommand(response)) { nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( response); + nodeAttributesHandler + .verifyRMHeartbeatResponseForNodeAttributes(response); // Explicitly put this method after checking the resync // response. We http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java new file mode 100644 index 0000000..325d60c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.Thread.State; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.ServerSocketUtil; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test NodeStatusUpdater for node attributes. + */ +public class TestNodeStatusUpdaterForAttributes extends NodeLabelTestBase { + private static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + private NodeManager nm; + private DummyNodeAttributesProvider dummyAttributesProviderRef; + + @Before + public void setup() { + dummyAttributesProviderRef = new DummyNodeAttributesProvider(); + } + + @After + public void tearDown() { + if (null != nm) { + ServiceOperations.stop(nm); + } + } + + private class ResourceTrackerForAttributes implements ResourceTracker { + private int heartbeatID = 0; + private Set<NodeAttribute> attributes; + + private boolean receivedNMHeartbeat = false; + private boolean receivedNMRegister = false; + + private MasterKey createMasterKey() { + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes( + ByteBuffer.wrap(new byte[] {new Integer(123).byteValue() })); + return masterKey; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException { + attributes = request.getNodeAttributes(); + RegisterNodeManagerResponse response = + RECORD_FACTORY.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(NodeAction.NORMAL); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + response.setAreNodeAttributesAcceptedByRM(attributes != null); + synchronized (ResourceTrackerForAttributes.class) { + receivedNMRegister = true; + ResourceTrackerForAttributes.class.notifyAll(); + } + return response; + } + + public void waitTillHeartbeat() + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> receivedNMHeartbeat, 100, 30000); + if (!receivedNMHeartbeat) { + Assert.fail("Heartbeat is not received even after waiting"); + } + } + + public void waitTillRegister() + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> receivedNMRegister, 100, 30000); + if (!receivedNMRegister) { + Assert.fail("Registration is not received even after waiting"); + } + } + + /** + * Flag to indicate received any. + */ + public void resetNMHeartbeatReceiveFlag() { + synchronized (ResourceTrackerForAttributes.class) { + receivedNMHeartbeat = false; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat( + NodeHeartbeatRequest request) { + attributes = request.getNodeAttributes(); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartbeatID++); + + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils + .newNodeHeartbeatResponse(heartbeatID, NodeAction.NORMAL, null, null, + null, null, 1000L); + + // to ensure that heartbeats are sent only when required. + nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE); + nhResponse.setAreNodeAttributesAcceptedByRM(attributes != null); + + synchronized (ResourceTrackerForAttributes.class) { + receivedNMHeartbeat = true; + ResourceTrackerForAttributes.class.notifyAll(); + } + return nhResponse; + } + + @Override + public UnRegisterNodeManagerResponse unRegisterNodeManager( + UnRegisterNodeManagerRequest request) { + return null; + } + } + + /** + * A dummy NodeAttributesProvider class for tests. + */ + public static class DummyNodeAttributesProvider + extends NodeAttributesProvider { + + public DummyNodeAttributesProvider() { + super("DummyNodeAttributesProvider"); + // disable the fetch timer. + setIntervalTime(-1); + } + + @Override + protected void cleanUp() throws Exception { + // fake implementation, nothing to cleanup + } + + @Override + public TimerTask createTimerTask() { + return new TimerTask() { + @Override + public void run() { + setDescriptors(Collections.unmodifiableSet(new HashSet<>(0))); + } + }; + } + } + + private YarnConfiguration createNMConfigForDistributeNodeAttributes() { + YarnConfiguration conf = new YarnConfiguration(); + return conf; + } + + @Test(timeout = 20000) + public void testNodeStatusUpdaterForNodeAttributes() + throws InterruptedException, IOException, TimeoutException { + final ResourceTrackerForAttributes resourceTracker = + new ResourceTrackerForAttributes(); + nm = new NodeManager() { + @Override + protected NodeAttributesProvider createNodeAttributesProvider( + Configuration conf) throws IOException { + return dummyAttributesProviderRef; + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater( + Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker) { + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics) { + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfigForDistributeNodeAttributes(); + conf.setLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, 2000); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, + "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10)); + + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillRegister(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(), + resourceTracker.attributes)); + + resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with updated attributes + NodeAttribute attribute1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", + NodeAttributeType.STRING, "V1"); + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1)); + + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat without updating attributes + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + resourceTracker.resetNMHeartbeatReceiveFlag(); + assertNull("If no change in attributes" + + " then null should be sent as part of request", + resourceTracker.attributes); + + // provider return with null attributes + dummyAttributesProviderRef.setDescriptors(null); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNotNull("If provider sends null" + + " then empty label set should be sent and not null", + resourceTracker.attributes); + assertTrue("If provider sends null then empty attributes should be sent", + resourceTracker.attributes.isEmpty()); + resourceTracker.resetNMHeartbeatReceiveFlag(); + // Since the resync interval is set to 2 sec in every alternate heartbeat + // the attributes will be send along with heartbeat. + // In loop we sleep for 1 sec + // so that every sec 1 heartbeat is send. + int nullAttributes = 0; + int nonNullAttributes = 0; + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1)); + for (int i = 0; i < 5; i++) { + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + if (null == resourceTracker.attributes) { + nullAttributes++; + } else { + Assert.assertTrue("In heartbeat PI attributes should be send", + NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1), + resourceTracker.attributes)); + nonNullAttributes++; + } + resourceTracker.resetNMHeartbeatReceiveFlag(); + Thread.sleep(1000); + } + Assert.assertTrue("More than one heartbeat with empty attributes expected", + nullAttributes > 1); + Assert.assertTrue("More than one heartbeat with attributes expected", + nonNullAttributes > 1); + nm.stop(); + } + + @Test(timeout = 20000) + public void testInvalidNodeAttributesFromProvider() + throws InterruptedException, IOException, TimeoutException { + final ResourceTrackerForAttributes resourceTracker = + new ResourceTrackerForAttributes(); + nm = new NodeManager() { + @Override protected NodeAttributesProvider createNodeAttributesProvider( + Configuration conf) throws IOException { + return dummyAttributesProviderRef; + } + + @Override protected NodeStatusUpdater createNodeStatusUpdater( + Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker) { + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics) { + @Override protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfigForDistributeNodeAttributes(); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, + "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10)); + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillRegister(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(), + resourceTracker.attributes)); + + resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // update attribute1 + NodeAttribute attribute1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", + NodeAttributeType.STRING, "V1"); + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // update attribute2 + NodeAttribute attribute2 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", + NodeAttributeType.STRING, "V2"); + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute2)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute2), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // update attribute2 & attribute2 + dummyAttributesProviderRef + .setDescriptors(ImmutableSet.of(attribute1, attribute2)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(attribute1, attribute2), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with invalid attributes + NodeAttribute invalidAttribute = NodeAttribute + .newInstance("_.P", "Attr1", NodeAttributeType.STRING, "V1"); + dummyAttributesProviderRef + .setDescriptors(ImmutableSet.of(invalidAttribute)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNull("On Invalid Attributes we need to retain earlier attributes, HB" + + " needs to send null", resourceTracker.attributes); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // on next heartbeat same invalid attributes will be given by the provider, + // but again validation check and reset RM with invalid attributes set + // should not happen + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNull("NodeStatusUpdater need not send repeatedly empty attributes on" + + " invalid attributes from provider ", resourceTracker.attributes); + resourceTracker.resetNMHeartbeatReceiveFlag(); + } + + /** + * This is to avoid race condition in the test case. NodeStatusUpdater + * heartbeat thread after sending the heartbeat needs some time to process the + * response and then go wait state. But in the test case once the main test + * thread returns back after resourceTracker.waitTillHeartbeat() we proceed + * with next sendOutofBandHeartBeat before heartbeat thread is blocked on + * wait. + * @throws InterruptedException + * @throws IOException + */ + private void sendOutofBandHeartBeat() + throws InterruptedException, IOException { + int i = 0; + do { + State statusUpdaterThreadState = + ((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater()) + .getStatusUpdaterThreadState(); + if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING) + || statusUpdaterThreadState.equals(Thread.State.WAITING)) { + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + break; + } + if (++i <= 10) { + Thread.sleep(50); + } else { + throw new IOException("Waited for 500 ms" + + " but NodeStatusUpdaterThread not in waiting state"); + } + } while (true); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b67172e..f3d8eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -483,6 +484,22 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId); } + // Update node's attributes to RM's NodeAttributesManager. + if (request.getNodeAttributes() != null) { + try { + // update node attributes if necessary then update heartbeat response + updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes()); + response.setAreNodeAttributesAcceptedByRM(true); + } catch (IOException ex) { + //ensure the error message is captured and sent across in response + String errorMsg = response.getDiagnosticsMessage() == null ? + ex.getMessage() : + response.getDiagnosticsMessage() + "\n" + ex.getMessage(); + response.setDiagnosticsMessage(errorMsg); + response.setAreNodeAttributesAcceptedByRM(false); + } + } + StringBuilder message = new StringBuilder(); message.append("NodeManager from node ").append(host).append("(cmPort: ") .append(cmPort).append(" httpPort: "); @@ -493,6 +510,10 @@ public class ResourceTrackerService extends AbstractService implements message.append(", node labels { ").append( StringUtils.join(",", nodeLabels) + " } "); } + if (response.getAreNodeAttributesAcceptedByRM()) { + message.append(", node attributes { ") + .append(request.getNodeAttributes() + " } "); + } LOG.info(message.toString()); response.setNodeAction(NodeAction.NORMAL); @@ -650,34 +671,72 @@ public class ResourceTrackerService extends AbstractService implements // 8. Get node's attributes and update node-to-attributes mapping // in RMNodeAttributeManager. - Set<NodeAttribute> nodeAttributes = request.getNodeAttributes(); - if (nodeAttributes != null && !nodeAttributes.isEmpty()) { - nodeAttributes.forEach(nodeAttribute -> - LOG.debug(nodeId.toString() + " ATTRIBUTE : " - + nodeAttribute.toString())); - - // Validate attributes - if (!nodeAttributes.stream().allMatch( - nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED - .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) { - // All attributes must be in same prefix: nm.yarn.io. - // Since we have the checks in NM to make sure attributes reported - // in HB are with correct prefix, so it should not reach here. - LOG.warn("Reject invalid node attributes from host: " - + nodeId.toString() + ", attributes in HB must have prefix " - + NodeAttribute.PREFIX_DISTRIBUTED); - } else { - // Replace all distributed node attributes associated with this host - // with the new reported attributes in node attribute manager. - this.rmContext.getNodeAttributesManager() - .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, - ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + if (request.getNodeAttributes() != null) { + try { + // update node attributes if necessary then update heartbeat response + updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes()); + nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true); + } catch (IOException ex) { + //ensure the error message is captured and sent across in response + String errorMsg = + nodeHeartBeatResponse.getDiagnosticsMessage() == null ? + ex.getMessage() : + nodeHeartBeatResponse.getDiagnosticsMessage() + "\n" + ex + .getMessage(); + nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg); + nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false); } } return nodeHeartBeatResponse; } + /** + * Update node attributes if necessary. + * @param nodeId - node id + * @param nodeAttributes - node attributes + * @return true if updated + * @throws IOException if prefix type is not distributed + */ + private void updateNodeAttributesIfNecessary(NodeId nodeId, + Set<NodeAttribute> nodeAttributes) throws IOException { + if (LOG.isDebugEnabled()) { + nodeAttributes.forEach(nodeAttribute -> LOG.debug( + nodeId.toString() + " ATTRIBUTE : " + nodeAttribute.toString())); + } + + // Validate attributes + if (!nodeAttributes.stream().allMatch( + nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED + .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) { + // All attributes must be in same prefix: nm.yarn.io. + // Since we have the checks in NM to make sure attributes reported + // in HB are with correct prefix, so it should not reach here. + throw new IOException("Reject invalid node attributes from host: " + + nodeId.toString() + ", attributes in HB must have prefix " + + NodeAttribute.PREFIX_DISTRIBUTED); + } + // Replace all distributed node attributes associated with this host + // with the new reported attributes in node attribute manager. + Set<NodeAttribute> currentNodeAttributes = + this.rmContext.getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).keySet(); + if (!currentNodeAttributes.isEmpty()) { + currentNodeAttributes = NodeLabelUtil + .filterAttributesByPrefix(currentNodeAttributes, + NodeAttribute.PREFIX_DISTRIBUTED); + } + if (!NodeLabelUtil + .isNodeAttributesEquals(nodeAttributes, currentNodeAttributes)) { + this.rmContext.getNodeAttributesManager() + .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, + ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Skip updating node attributes since there is no change for " + + nodeId + " : " + nodeAttributes); + } + } + private int getNextResponseId(int responseId) { // Loop between 0 and Integer.MAX_VALUE return (responseId + 1) & Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index 83c5983..90cf110 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -221,10 +221,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { // Notify RM if (rmContext != null && rmContext.getDispatcher() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Updated NodeAttribute event to RM:" - + newNodeToAttributesMap.values()); - } + LOG.info("Updated NodeAttribute event to RM:" + + newNodeToAttributesMap); rmContext.getDispatcher().getEventHandler().handle( new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); } @@ -306,9 +304,11 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { for (NodeAttribute attribute : nodeToAttrMappingEntry.getValue()) { NodeAttributeKey attributeKey = attribute.getAttributeKey(); String attributeName = attributeKey.getAttributeName().trim(); - NodeLabelUtil.checkAndThrowLabelName(attributeName); + NodeLabelUtil.checkAndThrowAttributeName(attributeName); NodeLabelUtil .checkAndThrowAttributePrefix(attributeKey.getAttributePrefix()); + NodeLabelUtil + .checkAndThrowAttributeValue(attribute.getAttributeValue()); // ensure trimmed values are set back attributeKey.setAttributeName(attributeName); @@ -747,8 +747,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { // Notify RM if (rmContext != null && rmContext.getDispatcher() != null) { - LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap - .values()); + LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap); rmContext.getDispatcher().getEventHandler().handle( new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
