Repository: hadoop
Updated Branches:
  refs/heads/branch-3.2 9c89e2ea7 -> 7deef08eb


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7deef08e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e40b3c0..13f071e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -42,6 +46,7 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.transform.Transformer;
 import javax.xml.transform.TransformerFactory;
@@ -113,6 +118,9 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
@@ -730,6 +738,137 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     }
   }
 
+  @Test
+  public void testNodeRegistrationWithAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    File tempDir = File.createTempFile("nattr", ".tmp");
+    tempDir.delete();
+    tempDir.mkdirs();
+    tempDir.deleteOnExit();
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        tempDir.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    NodeAttribute nodeAttribute1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttribute2 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "V2");
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    registerReq.setNodeAttributes(toSet(nodeAttribute1, nodeAttribute2));
+    RegisterNodeManagerResponse response =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    Assert.assertEquals("Action should be normal on valid Node Attributes",
+        NodeAction.NORMAL, response.getNodeAction());
+    Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(
+        rm.getRMContext().getNodeAttributesManager()
+            .getAttributesForNode(nodeId.getHost()).keySet(),
+        registerReq.getNodeAttributes()));
+    Assert.assertTrue("Valid Node Attributes were not accepted by RM",
+        response.getAreNodeAttributesAcceptedByRM());
+
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testNodeRegistrationWithInvalidAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        TEMP_DIR.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest req =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    NodeAttribute validNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
+        .newInstance("_P", "Attr1",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidNameNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidValueNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "...");
+    req.setResource(capability);
+    req.setNodeId(nodeId);
+    req.setHttpPort(1234);
+    req.setNMVersion(YarnVersionInfo.getVersion());
+
+    // check invalid prefix
+    req.setNodeAttributes(
+        toSet(validNodeAttribute, invalidPrefixNodeAttribute));
+    RegisterNodeManagerResponse response =
+        resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertRegisterResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .endsWith("attributes in HB must have prefix nm.yarn.io"));
+
+    // check invalid name
+    req.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
+    response = resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertRegisterResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute name should only contains"));
+
+    // check invalid value
+    req.setNodeAttributes(toSet(validNodeAttribute, 
invalidValueNodeAttribute));
+    response = resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertRegisterResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute value should only contains"));
+
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  private void assertRegisterResponseForInvalidAttributes(
+      RegisterNodeManagerResponse response) {
+    Assert.assertEquals(
+        "On Invalid Node Labels action is expected to be normal",
+            NodeAction.NORMAL, response.getNodeAction());
+    Assert.assertNotNull(response.getDiagnosticsMessage());
+    Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
+        response.getAreNodeLabelsAcceptedByRM());
+  }
+
   private NodeStatus getNodeStatusObject(NodeId nodeId) {
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setNodeId(nodeId);
@@ -832,12 +971,8 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
         hostFile.getAbsolutePath());
     conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
         FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
-    File tempDir = File.createTempFile("nattr", ".tmp");
-    tempDir.delete();
-    tempDir.mkdirs();
-    tempDir.deleteOnExit();
     conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
-        tempDir.getAbsolutePath());
+        TEMP_DIR.getAbsolutePath());
     rm = new MockRM(conf);
     rm.start();
 
@@ -906,6 +1041,287 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
   }
 
   @Test
+  public void testNodeHeartbeatWithInvalidNodeAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        TEMP_DIR.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    // Register to RM
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    RegisterNodeManagerResponse registerResponse =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    NodeAttribute validNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "host",
+            NodeAttributeType.STRING, "host2");
+    NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
+        .newInstance("_P", "Attr1",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidNameNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidValueNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "...");
+
+    // Set node attributes in HB.
+    NodeHeartbeatRequest heartbeatReq =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+    int responseId = nodeStatusObject.getResponseId();
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+        .getNMTokenMasterKey());
+    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+        .getContainerTokenMasterKey());
+    heartbeatReq.setNodeAttributes(toSet(validNodeAttribute));
+
+    // Send first HB to RM with invalid prefix node attributes
+    heartbeatReq.setNodeAttributes(
+        toSet(validNodeAttribute, invalidPrefixNodeAttribute));
+    NodeHeartbeatResponse response =
+        resourceTrackerService.nodeHeartbeat(heartbeatReq);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertNodeHeartbeatResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .endsWith("attributes in HB must have prefix nm.yarn.io"));
+
+    // Send another HB to RM with invalid name node attributes
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq
+        .setNodeAttributes(toSet(validNodeAttribute, 
invalidNameNodeAttribute));
+    response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertNodeHeartbeatResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute name should only contains"));
+
+    // Send another HB to RM with invalid value node attributes
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeAttributes(
+        toSet(validNodeAttribute, invalidValueNodeAttribute));
+    response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertNodeHeartbeatResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute value should only contains"));
+
+    // Send another HB to RM with updated node attribute
+    NodeAttribute updatedNodeAttribute = NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3");
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeAttributes(toSet(updatedNodeAttribute));
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    NodeAttributesManager attributeManager =
+        rm.getRMContext().getNodeAttributesManager();
+    Map<NodeAttribute, AttributeValue> attrs =
+        attributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(1, attrs.size());
+    NodeAttribute na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
+    Assert.assertEquals("host3", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+  }
+
+  private void assertNodeHeartbeatResponseForInvalidAttributes(
+      NodeHeartbeatResponse response) {
+    Assert.assertEquals(
+        "On Invalid Node Labels action is expected to be normal",
+        NodeAction.NORMAL, response.getNodeAction());
+    Assert.assertNotNull(response.getDiagnosticsMessage());
+    Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
+        response.getAreNodeLabelsAcceptedByRM());
+  }
+
+  @Test
+  public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded()
+      throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        NullNodeAttributeStore.class, NodeAttributeStore.class);
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        TEMP_DIR.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    // spy node attributes manager
+    NodeAttributesManager tmpAttributeManager =
+        rm.getRMContext().getNodeAttributesManager();
+    NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager);
+    rm.getRMContext().setNodeAttributesManager(spyAttributeManager);
+    AtomicInteger count = new AtomicInteger(0);
+    Mockito.doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        count.incrementAndGet();
+        tmpAttributeManager
+            .replaceNodeAttributes((String) invocation.getArguments()[0],
+                (Map<String, Set<NodeAttribute>>) 
invocation.getArguments()[1]);
+        return null;
+      }
+    }).when(spyAttributeManager)
+        .replaceNodeAttributes(Mockito.any(String.class),
+            Mockito.any(Map.class));
+
+    // Register to RM
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    RegisterNodeManagerResponse registerResponse =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    Set<NodeAttribute> nodeAttributes = new HashSet<>();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host2"));
+
+    // Set node attributes in HB.
+    NodeHeartbeatRequest heartbeatReq =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+    int responseId = nodeStatusObject.getResponseId();
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+        .getNMTokenMasterKey());
+    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+        .getContainerTokenMasterKey());
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Ensure RM gets correct node attributes update.
+    Map<NodeAttribute, AttributeValue> attrs = spyAttributeManager
+        .getAttributesForNode(nodeId.getHost());
+    
spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost()));
+    Assert.assertEquals(1, attrs.size());
+    NodeAttribute na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
+    Assert.assertEquals("host2", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+    Assert.assertEquals(1, count.get());
+
+    // Send HBs to RM with the same node attributes
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM updated node attributes once
+    Assert.assertEquals(1, count.get());
+
+    // Send another HB to RM with updated node attributes
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3"));
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(1, attrs.size());
+    na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
+    Assert.assertEquals("host3", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+
+    // Make sure RM updated node attributes twice
+    Assert.assertEquals(2, count.get());
+
+    // Add centralized attributes
+    Map<String, Set<NodeAttribute>> nodeAttributeMapping = ImmutableMap
+        .of(nodeId.getHost(), ImmutableSet.of(NodeAttribute.newInstance(
+            NodeAttribute.PREFIX_CENTRALIZED, "centAttr",
+            NodeAttributeType.STRING, "x")));
+    spyAttributeManager.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        nodeAttributeMapping);
+
+    // Make sure RM updated node attributes three times
+    Assert.assertEquals(3, count.get());
+
+    // Send another HB to RM with non-updated node attributes
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3"));
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM still updated node attributes three times
+    Assert.assertEquals(3, count.get());
+
+    // Send another HB to RM with updated node attributes
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host4"));
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(2, attrs.size());
+    attrs.keySet().stream().forEach(e -> {
+      Assert.assertEquals(NodeAttributeType.STRING, e.getAttributeType());
+      if (e.getAttributeKey().getAttributePrefix()
+          == NodeAttribute.PREFIX_DISTRIBUTED) {
+        Assert.assertEquals("host", e.getAttributeKey().getAttributeName());
+        Assert.assertEquals("host4", e.getAttributeValue());
+      } else if (e.getAttributeKey().getAttributePrefix()
+          == NodeAttribute.PREFIX_CENTRALIZED) {
+        Assert.assertEquals("centAttr", 
e.getAttributeKey().getAttributeName());
+        Assert.assertEquals("x", e.getAttributeValue());
+      }
+    });
+
+    // Make sure RM updated node attributes four times
+    Assert.assertEquals(4, count.get());
+
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  @Test
   public void testNodeHeartBeatWithInvalidLabels() throws Exception {
     writeToHostsFile("host2");
     Configuration conf = new Configuration();
@@ -2402,4 +2818,34 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
     Assert.assertEquals(1, nodeHeartbeat.getResponseId());
   }
+
+  /**
+   * A no-op implementation of NodeAttributeStore for testing.
+   */
+  public static class NullNodeAttributeStore implements NodeAttributeStore {
+
+    @Override
+    public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
+    }
+
+    @Override
+    public void addNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
+    }
+
+    @Override
+    public void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
+    }
+
+    @Override
+    public void init(Configuration configuration, NodeAttributesManager mgr) {
+    }
+
+    @Override
+    public void recover() {
+    }
+
+    @Override
+    public void close() {
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to