Repository: hadoop
Updated Branches:
  refs/heads/branch-2 f6b0fcdc7 -> 28edc7b12


YARN-3964. Support NodeLabelsProvider at Resource Manager side.
Contributed by Dian Fu.

(cherry picked from commit db9304788187c700647c4d84caeb3b5ad6d868d8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28edc7b1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28edc7b1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28edc7b1

Branch: refs/heads/branch-2
Commit: 28edc7b1296c2cead1c290c377db2881b9dbb9fd
Parents: f6b0fcd
Author: Devaraj K <deva...@apache.org>
Authored: Sun Oct 11 11:21:29 2015 +0530
Committer: Devaraj K <deva...@apache.org>
Committed: Sun Oct 11 11:24:05 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  40 +++-
 .../nodelabels/CommonNodeLabelsManager.java     |  18 +-
 .../src/main/resources/yarn-default.xml         |  28 ++-
 .../server/resourcemanager/AdminService.java    |  26 +--
 .../resourcemanager/RMActiveServiceContext.java |  15 ++
 .../yarn/server/resourcemanager/RMContext.java  |   6 +
 .../server/resourcemanager/RMContextImpl.java   |  13 ++
 .../server/resourcemanager/ResourceManager.java |  22 ++
 .../resourcemanager/ResourceTrackerService.java |  25 +--
 .../nodelabels/NodeLabelsUtils.java             |  59 ++++++
 .../RMDelegatedNodeLabelsUpdater.java           | 211 +++++++++++++++++++
 .../nodelabels/RMNodeLabelsMappingProvider.java |  45 ++++
 .../resourcemanager/webapp/RMWebServices.java   |  21 +-
 .../resourcemanager/TestRMAdminService.java     |   8 +-
 .../TestResourceTrackerService.java             |   5 +-
 .../TestRMDelegatedNodeLabelsUpdater.java       | 163 ++++++++++++++
 .../webapp/TestRMWebServicesNodeLabels.java     |   4 +-
 18 files changed, 648 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f278b98..8ef524f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -174,6 +174,9 @@ Release 2.8.0 - UNRELEASED
     YARN-261. Ability to fail AM attempts (Andrey Klochkov and
     Rohith Sharma K S via jlowe)
 
+    YARN-3964. Support NodeLabelsProvider at Resource Manager side.
+    (Dian Fu via devaraj)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 654a493..3af0c62 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1999,14 +1999,17 @@ public class YarnConfiguration extends Configuration {
   public static final String NODELABEL_CONFIGURATION_TYPE =
       NODE_LABELS_PREFIX + "configuration-type";
   
-  public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
+  public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE =
       "centralized";
-  
+
+  public static final String DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE 
=
+      "delegated-centralized";
+
   public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
       "distributed";
   
   public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
-      CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
+      CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
 
   public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
       YARN_PREFIX + "cluster.max-application-priority";
@@ -2019,6 +2022,20 @@ public class YarnConfiguration extends Configuration {
         NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
   }
 
+  @Private
+  public static boolean isCentralizedNodeLabelConfiguration(
+      Configuration conf) {
+    return CENTRALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
+        NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
+  }
+
+  @Private
+  public static boolean isDelegatedCentralizedNodeLabelConfiguration(
+      Configuration conf) {
+    return DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
+        NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
+  }
+
   private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
       + "node-labels.";
 
@@ -2055,6 +2072,23 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
       NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
 
+  private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+      + "node-labels.";
+
+  public static final String RM_NODE_LABELS_PROVIDER_CONFIG =
+      RM_NODE_LABELS_PREFIX + "provider";
+
+  private static final String RM_NODE_LABELS_PROVIDER_PREFIX =
+      RM_NODE_LABELS_PREFIX + "provider.";
+
+  //If -1 is configured then no timer task should be created
+  public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+      RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
+
+  //once in 30 mins
+  public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+      30 * 60 * 1000;
+
   public static final String AM_BLACKLISTING_ENABLED =
       YARN_PREFIX + "am.blacklisting.enabled";
   public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
index deec6ab..ef5462f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -101,7 +101,7 @@ public class CommonNodeLabelsManager extends 
AbstractService {
   protected NodeLabelsStore store;
   private boolean nodeLabelsEnabled = false;
 
-  private boolean isDistributedNodeLabelConfiguration = false;
+  private boolean isCentralizedNodeLabelConfiguration = true;
 
   /**
    * A <code>Host</code> can have multiple <code>Node</code>s 
@@ -220,16 +220,16 @@ public class CommonNodeLabelsManager extends 
AbstractService {
         conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
             YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
 
-    isDistributedNodeLabelConfiguration  =
-        YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
-    
+    isCentralizedNodeLabelConfiguration  =
+        YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
+
     labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
   }
 
   protected void initNodeLabelStore(Configuration conf) throws Exception {
     this.store = new FileSystemNodeLabelsStore(this);
     this.store.init(conf);
-    this.store.recover(isDistributedNodeLabelConfiguration);
+    this.store.recover(!isCentralizedNodeLabelConfiguration);
   }
 
   // for UT purpose
@@ -624,10 +624,14 @@ public class CommonNodeLabelsManager extends 
AbstractService {
       }
     }
     
-    if (null != dispatcher && !isDistributedNodeLabelConfiguration) {
-      // In case of DistributedNodeLabelConfiguration, no need to save the the
+    if (null != dispatcher && isCentralizedNodeLabelConfiguration) {
+      // In case of DistributedNodeLabelConfiguration or
+      // DelegatedCentralizedNodeLabelConfiguration, no need to save the the
       // NodeLabels Mapping to the back-end store, as on RM restart/failover
       // NodeLabels are collected from NM through Register/Heartbeat again
+      // in case of DistributedNodeLabelConfiguration and collected from
+      // RMNodeLabelsMappingProvider in case of
+      // DelegatedCentralizedNodeLabelConfiguration
       dispatcher.getEventHandler().handle(
           new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 53793d2..e3d2c69 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2114,7 +2114,7 @@
   <property>
     <description>
     Set configuration type for node labels. Administrators can specify
-    "centralized" or "distributed".
+    "centralized", "delegated-centralized" or "distributed".
     </description>
     <name>yarn.node-labels.configuration-type</name>
     <value>centralized</value>
@@ -2176,6 +2176,32 @@
     <name>yarn.nodemanager.node-labels.provider.fetch-timeout-ms</name>
     <value>1200000</value>
   </property>
+
+  <!-- Delegated-centralized Node Labels Configuration -->
+  <property>
+    <description>
+    When node labels "yarn.node-labels.configuration-type" is
+    of type "delegated-centralized", administrators should configure
+    the class for fetching node labels by ResourceManager. Configured
+    class needs to extend
+    org.apache.hadoop.yarn.server.resourcemanager.nodelabels.
+    RMNodeLabelsMappingProvider.
+    </description>
+    <name>yarn.resourcemanager.node-labels.provider</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+    When node labels "yarn.node-labels.configuration-type" is of type
+    "delegated-centralized" then periodically node labels are retrieved
+    from the node labels provider. This configuration is to define the
+    interval. If -1 is configured then node labels are retrieved from
+    provider only once for each node after it registers. Defaults to 30 mins.
+    </description>
+    <name>yarn.resourcemanager.node-labels.provider.fetch-interval-ms</name>
+    <value>1800000</value>
+  </property>
   <!-- Other Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index ab46419..353e72d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -86,6 +86,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import 
org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -120,7 +121,7 @@ public class AdminService extends CompositeService 
implements
   private UserGroupInformation daemonUser;
 
   @VisibleForTesting
-  boolean isDistributedNodeLabelConfiguration = false;
+  boolean isCentralizedNodeLabelConfiguration = true;
 
   public AdminService(ResourceManager rm, RMContext rmContext) {
     super(AdminService.class.getName());
@@ -151,8 +152,8 @@ public class AdminService extends CompositeService 
implements
         .getCurrentUser());
     rmId = conf.get(YarnConfiguration.RM_HA_ID);
 
-    isDistributedNodeLabelConfiguration =
-        YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
+    isCentralizedNodeLabelConfiguration =
+        YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
 
     super.serviceInit(conf);
   }
@@ -745,7 +746,13 @@ public class AdminService extends CompositeService 
implements
     String operation = "replaceLabelsOnNode";
     final String msg = "set node to labels.";
 
-    checkAndThrowIfDistributedNodeLabelConfEnabled(operation);
+    try {
+      NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(operation,
+          isCentralizedNodeLabelConfiguration);
+    } catch (IOException ioe) {
+      throw RPCUtil.getRemoteException(ioe);
+    }
+
     UserGroupInformation user = checkAcls(operation);
 
     checkRMStatus(user.getShortUserName(), operation, msg);
@@ -780,17 +787,6 @@ public class AdminService extends CompositeService 
implements
     return RPCUtil.getRemoteException(exception);
   }
 
-  private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
-      throws YarnException {
-    if (isDistributedNodeLabelConfiguration) {
-      String msg =
-          String.format("Error when invoke method=%s because of "
-              + "distributed node label configuration enabled.", operation);
-      LOG.error(msg);
-      throw RPCUtil.getRemoteException(new IOException(msg));
-    }
-  }
-
   @Override
   public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
       CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index c71323f..dd2153a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -94,6 +95,7 @@ public class RMActiveServiceContext {
   private ResourceTrackerService resourceTrackerService;
   private ApplicationMasterService applicationMasterService;
   private RMNodeLabelsManager nodeLabelManager;
+  private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
   private long epoch;
   private Clock systemClock = new SystemClock();
   private long schedulerRecoveryStartTime = 0;
@@ -392,6 +394,19 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
+  public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
+    return rmDelegatedNodeLabelsUpdater;
+  }
+
+  @Private
+  @Unstable
+  public void setRMDelegatedNodeLabelsUpdater(
+      RMDelegatedNodeLabelsUpdater nodeLablesUpdater) {
+    rmDelegatedNodeLabelsUpdater = nodeLablesUpdater;
+  }
+
+  @Private
+  @Unstable
   public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
     this.schedulerRecoveryStartTime = systemClock.getTime();
     this.schedulerRecoveryWaitTime = waitTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index b64c834..9802a37 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -118,6 +119,11 @@ public interface RMContext {
   
   public void setNodeLabelManager(RMNodeLabelsManager mgr);
 
+  RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater();
+
+  void setRMDelegatedNodeLabelsUpdater(
+      RMDelegatedNodeLabelsUpdater nodeLabelsUpdater);
+
   long getEpoch();
 
   ReservationSystem getReservationSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 840cea7..ed9942b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -400,6 +401,18 @@ public class RMContextImpl implements RMContext {
     activeServiceContext.setNodeLabelManager(mgr);
   }
 
+  @Override
+  public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
+    return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
+  }
+
+  @Override
+  public void setRMDelegatedNodeLabelsUpdater(
+      RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater) {
+    activeServiceContext.setRMDelegatedNodeLabelsUpdater(
+        delegatedNodeLabelsUpdater);
+  }
+
   public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
     activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d1f339a..b38f188 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -62,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublis
 import 
org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -440,6 +441,13 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
       addService(nlm);
       rmContext.setNodeLabelManager(nlm);
 
+      RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
+          createRMDelegatedNodeLabelsUpdater();
+      if (delegatedNodeLabelsUpdater != null) {
+        addService(delegatedNodeLabelsUpdater);
+        rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
+      }
+
       boolean isRecoveryEnabled = conf.getBoolean(
           YarnConfiguration.RECOVERY_ENABLED,
           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@@ -1113,6 +1121,20 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     return new RMSecretManagerService(conf, rmContext);
   }
 
+  /**
+   * Create RMDelegatedNodeLabelsUpdater based on configuration.
+   */
+  protected RMDelegatedNodeLabelsUpdater createRMDelegatedNodeLabelsUpdater() {
+    if (conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
+            YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED)
+        && YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(
+            conf)) {
+      return new RMDelegatedNodeLabelsUpdater(rmContext);
+    } else {
+      return null;
+    }
+  }
+
   @Private
   public ClientRMService getClientRMService() {
     return this.clientRM;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 248cdc6..d6c7883 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
@@ -43,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,6 +61,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRe
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -105,6 +104,7 @@ public class ResourceTrackerService extends AbstractService 
implements
   private int minAllocVcores;
 
   private boolean isDistributedNodeLabelsConf;
+  private boolean isDelegatedCentralizedNodeLabelsConf;
 
   public ResourceTrackerService(RMContext rmContext,
       NodesListManager nodesListManager,
@@ -151,6 +151,8 @@ public class ResourceTrackerService extends AbstractService 
implements
 
     isDistributedNodeLabelsConf =
         YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
+    isDelegatedCentralizedNodeLabelsConf = YarnConfiguration
+        .isDelegatedCentralizedNodeLabelConfiguration(conf);
 
     super.serviceInit(conf);
   }
@@ -241,17 +243,6 @@ public class ResourceTrackerService extends 
AbstractService implements
     }
   }
 
-  static Set<String> convertToStringSet(Set<NodeLabel> nodeLabels) {
-    if (null == nodeLabels) {
-      return null;
-    }
-    Set<String> labels = new HashSet<String>();
-    for (NodeLabel label : nodeLabels) {
-      labels.add(label.getName());
-    }
-    return labels;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public RegisterNodeManagerResponse registerNodeManager(
@@ -353,7 +344,8 @@ public class ResourceTrackerService extends AbstractService 
implements
     }
 
     // Update node's labels to RM's NodeLabelManager.
-    Set<String> nodeLabels = convertToStringSet(request.getNodeLabels());
+    Set<String> nodeLabels = NodeLabelsUtils.convertToStringSet(
+        request.getNodeLabels());
     if (isDistributedNodeLabelsConf && nodeLabels != null) {
       try {
         updateNodeLabelsFromNMReport(nodeLabels, nodeId);
@@ -363,6 +355,8 @@ public class ResourceTrackerService extends AbstractService 
implements
         response.setDiagnosticsMessage(ex.getMessage());
         response.setAreNodeLabelsAcceptedByRM(false);
       }
+    } else if (isDelegatedCentralizedNodeLabelsConf) {
+      
this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
     }
 
     StringBuilder message = new StringBuilder();
@@ -480,7 +474,8 @@ public class ResourceTrackerService extends AbstractService 
implements
     if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
       try {
         updateNodeLabelsFromNMReport(
-            convertToStringSet(request.getNodeLabels()), nodeId);
+            NodeLabelsUtils.convertToStringSet(request.getNodeLabels()),
+            nodeId);
         nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
       } catch (IOException ex) {
         //ensure the error message is captured and sent across in response

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java
new file mode 100644
index 0000000..1645d13
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+
+/**
+ * Node labels utilities.
+ */
+public final class NodeLabelsUtils {
+  private static final Log LOG = LogFactory.getLog(NodeLabelsUtils.class);
+
+  private NodeLabelsUtils() { /* Hidden constructor */ }
+
+  public static Set<String> convertToStringSet(Set<NodeLabel> nodeLabels) {
+    if (null == nodeLabels) {
+      return null;
+    }
+    Set<String> labels = new HashSet<String>();
+    for (NodeLabel label : nodeLabels) {
+      labels.add(label.getName());
+    }
+    return labels;
+  }
+
+  public static void verifyCentralizedNodeLabelConfEnabled(String operation,
+      boolean isCentralizedNodeLabelConfiguration) throws IOException {
+    if (!isCentralizedNodeLabelConfiguration) {
+      String msg =
+          String.format("Error when invoke method=%s because "
+              + "centralized node label configuration is not enabled.",
+              operation);
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java
new file mode 100644
index 0000000..0c06211
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Update nodes labels map for Resource Manager periodically. It collects
+ * nodes labels from {@link RMNodeLabelsMappingProvider} and updates the
+ * nodes -> labels map via {@link RMNodeLabelsManager}. This service is
+ * enabled when configuration "yarn.node-labels.configuration-type" is
+ * set to "delegated-centralized".
+ */
+public class RMDelegatedNodeLabelsUpdater extends CompositeService {
+
+  private static final Log LOG = LogFactory
+      .getLog(RMDelegatedNodeLabelsUpdater.class);
+
+  public static final long DISABLE_DELEGATED_NODE_LABELS_UPDATE = -1;
+
+  // Timer used to schedule node labels fetching
+  private Timer nodeLabelsScheduler;
+  // 30 seconds
+  @VisibleForTesting
+  public long nodeLabelsUpdateInterval = 30 * 1000;
+
+  private Set<NodeId> newlyRegisteredNodes = new HashSet<NodeId>();
+  // Lock to protect newlyRegisteredNodes
+  private Object lock = new Object();
+  private long lastAllNodesLabelUpdateMills = 0L;
+  private long allNodesLabelUpdateInterval;
+
+  private RMNodeLabelsMappingProvider rmNodeLabelsMappingProvider;
+
+  private RMContext rmContext;
+
+  public RMDelegatedNodeLabelsUpdater(RMContext rmContext) {
+    super("RMDelegatedNodeLabelsUpdater");
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    allNodesLabelUpdateInterval = conf.getLong(
+        YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
+    rmNodeLabelsMappingProvider = createRMNodeLabelsMappingProvider(conf);
+    addService(rmNodeLabelsMappingProvider);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    nodeLabelsScheduler = new Timer(
+        "RMDelegatedNodeLabelsUpdater-Timer", true);
+    TimerTask delegatedNodeLabelsUpdaterTimerTask =
+        new RMDelegatedNodeLabelsUpdaterTimerTask();
+    nodeLabelsScheduler.scheduleAtFixedRate(
+        delegatedNodeLabelsUpdaterTimerTask,
+        nodeLabelsUpdateInterval,
+        nodeLabelsUpdateInterval);
+
+    super.serviceStart();
+  }
+
+  /**
+   * Terminate the timer.
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (nodeLabelsScheduler != null) {
+      nodeLabelsScheduler.cancel();
+    }
+    super.serviceStop();
+  }
+
+  private class RMDelegatedNodeLabelsUpdaterTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      Set<NodeId> nodesToUpdateLabels = null;
+      boolean isUpdatingAllNodes = false;
+
+      if (allNodesLabelUpdateInterval != DISABLE_DELEGATED_NODE_LABELS_UPDATE) 
{
+        long elapsedTimeSinceLastUpdate =
+            System.currentTimeMillis() - lastAllNodesLabelUpdateMills;
+        if (elapsedTimeSinceLastUpdate > allNodesLabelUpdateInterval) {
+          nodesToUpdateLabels =
+              Collections.unmodifiableSet(rmContext.getRMNodes().keySet());
+          isUpdatingAllNodes = true;
+        }
+      }
+
+      if (nodesToUpdateLabels == null && !newlyRegisteredNodes.isEmpty()) {
+        synchronized (lock) {
+          if (!newlyRegisteredNodes.isEmpty()) {
+            nodesToUpdateLabels = new HashSet<NodeId>(newlyRegisteredNodes);
+          }
+        }
+      }
+
+      try {
+        if (nodesToUpdateLabels != null && !nodesToUpdateLabels.isEmpty()) {
+          updateNodeLabelsInternal(nodesToUpdateLabels);
+          if (isUpdatingAllNodes) {
+            lastAllNodesLabelUpdateMills = System.currentTimeMillis();
+          }
+          synchronized (lock) {
+            newlyRegisteredNodes.removeAll(nodesToUpdateLabels);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to update node Labels", e);
+      }
+    }
+  }
+
+  private void updateNodeLabelsInternal(Set<NodeId> nodes)
+      throws IOException {
+    Map<NodeId, Set<NodeLabel>> labelsUpdated =
+        rmNodeLabelsMappingProvider.getNodeLabels(nodes);
+    if (labelsUpdated != null && labelsUpdated.size() != 0) {
+      Map<NodeId, Set<String>> nodeToLabels =
+          new HashMap<NodeId, Set<String>>(labelsUpdated.size());
+      for (Map.Entry<NodeId, Set<NodeLabel>> entry
+          : labelsUpdated.entrySet()) {
+        nodeToLabels.put(entry.getKey(),
+            NodeLabelsUtils.convertToStringSet(entry.getValue()));
+      }
+      rmContext.getNodeLabelManager().replaceLabelsOnNode(nodeToLabels);
+    }
+  }
+
+  /**
+   * Get the RMNodeLabelsMappingProvider which is used to provide node labels.
+   */
+  private RMNodeLabelsMappingProvider createRMNodeLabelsMappingProvider(
+      Configuration conf) throws IOException {
+    RMNodeLabelsMappingProvider nodeLabelsMappingProvider = null;
+    try {
+      Class<? extends RMNodeLabelsMappingProvider> labelsProviderClass =
+          conf.getClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG,
+              null, RMNodeLabelsMappingProvider.class);
+      if (labelsProviderClass != null) {
+        nodeLabelsMappingProvider = labelsProviderClass.newInstance();
+      }
+    } catch (InstantiationException | IllegalAccessException
+        | RuntimeException e) {
+      LOG.error("Failed to create RMNodeLabelsMappingProvider based on"
+          + " Configuration", e);
+      throw new IOException("Failed to create RMNodeLabelsMappingProvider : "
+          + e.getMessage(), e);
+    }
+
+    if (nodeLabelsMappingProvider == null) {
+      String msg = "RMNodeLabelsMappingProvider should be configured when "
+          + "delegated-centralized node label configuration is enabled";
+      LOG.error(msg);
+      throw new IOException(msg);
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("RM Node labels mapping provider class is : "
+          + nodeLabelsMappingProvider.getClass().toString());
+    }
+
+    return nodeLabelsMappingProvider;
+  }
+
+  /**
+   * Update node labels for a specified node.
+   * @param node the node to update node labels
+   */
+  public void updateNodeLabels(NodeId node) {
+    synchronized (lock) {
+      newlyRegisteredNodes.add(node);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java
new file mode 100644
index 0000000..28bc54b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+
+/**
+ * Interface which is responsible for providing the node -> labels map.
+ */
+public abstract class RMNodeLabelsMappingProvider extends AbstractService {
+
+  public RMNodeLabelsMappingProvider(String name) {
+    super(name);
+  }
+
+  /**
+   * Provides the labels. It is expected to give same Labels
+   * continuously until there is a change in labels.
+   *
+   * @param nodes to fetch labels
+   * @return Set of node label strings applicable for a node
+   */
+  public abstract Map<NodeId, Set<NodeLabel>> getNodeLabels(Set<NodeId> nodes);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 2410053..a0a6bda 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -111,6 +111,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -177,7 +178,7 @@ public class RMWebServices {
   private @Context HttpServletResponse response;
 
   @VisibleForTesting
-  boolean isDistributedNodeLabelConfiguration = false;
+  boolean isCentralizedNodeLabelConfiguration = true;
 
   public final static String DELEGATION_TOKEN_HEADER =
       "Hadoop-YARN-RM-Delegation-Token";
@@ -186,19 +187,8 @@ public class RMWebServices {
   public RMWebServices(final ResourceManager rm, Configuration conf) {
     this.rm = rm;
     this.conf = conf;
-    isDistributedNodeLabelConfiguration =
-        YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
-  }
-
-  private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
-      throws IOException {
-    if (isDistributedNodeLabelConfiguration) {
-      String msg =
-          String.format("Error when invoke method=%s because of "
-              + "distributed node label configuration enabled.", operation);
-      LOG.error(msg);
-      throw new IOException(msg);
-    }
+    isCentralizedNodeLabelConfiguration =
+        YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
   }
 
   RMWebServices(ResourceManager rm, Configuration conf,
@@ -892,7 +882,8 @@ public class RMWebServices {
       String operation) throws IOException {
     init();
 
-    checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode");
+    NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(
+        "replaceLabelsOnNode", isCentralizedNodeLabelConfiguration);
 
     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index 7e75cfa..e61d9fe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -872,12 +872,12 @@ public class TestRMAdminService {
   }
 
   @Test(expected = YarnException.class)
-  public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled()
+  public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled()
       throws IOException, YarnException {
     // create RM and set it's ACTIVE, and set distributed node label
     // configuration to true
     MockRM rm = new MockRM();
-    rm.adminService.isDistributedNodeLabelConfiguration = true;
+    rm.adminService.isCentralizedNodeLabelConfiguration = false;
 
     ((RMContextImpl) rm.getRMContext())
         .setHAServiceState(HAServiceState.ACTIVE);
@@ -893,14 +893,14 @@ public class TestRMAdminService {
   }
 
   @Test
-  public void testRemoveClusterNodeLabelsWithDistributedConfigurationEnabled()
+  public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled()
       throws IOException, YarnException {
     // create RM and set it's ACTIVE
     MockRM rm = new MockRM();
     ((RMContextImpl) rm.getRMContext())
         .setHAServiceState(HAServiceState.ACTIVE);
     RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
-    rm.adminService.isDistributedNodeLabelConfiguration = true;
+    rm.adminService.isCentralizedNodeLabelConfiguration = false;
 
     // by default, distributed configuration for node label is disabled, this
     // should pass

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 94a0e4c..e42ed91 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -61,6 +61,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -362,7 +363,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     Assert.assertEquals("Action should be normal on valid Node Labels",
         NodeAction.NORMAL, response.getNodeAction());
     assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
-        
ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels()));
+        NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels()));
     Assert.assertTrue("Valid Node Labels were not accepted by RM",
         response.getAreNodeLabelsAcceptedByRM());
     rm.stop();
@@ -590,7 +591,7 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     Assert.assertEquals("InValid Node Labels were not accepted by RM",
         NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
     assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
-        
ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels()));
+        NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels()));
     Assert.assertTrue("Valid Node Labels were not accepted by RM",
         nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java
new file mode 100644
index 0000000..928124d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+public class TestRMDelegatedNodeLabelsUpdater extends NodeLabelTestBase {
+  private YarnConfiguration conf;
+  private static Map<NodeId, Set<NodeLabel>> nodeLabelsMap = Maps.newHashMap();
+
+  @Before
+  public void setup() {
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
+        YarnConfiguration.DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE);
+    conf.setClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG,
+        DummyRMNodeLabelsMappingProvider.class,
+        RMNodeLabelsMappingProvider.class);
+  }
+
+  @Test
+  public void testRMNodeLabelsMappingProviderConfiguration() {
+    conf.unset(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG);
+    try {
+      MockRM rm = new MockRM(conf);
+      rm.init(conf);
+      rm.start();
+      Assert.fail("Expected an exception");
+    } catch (Exception e) {
+      // expected an exception
+      Assert.assertTrue(e.getMessage().contains(
+          "RMNodeLabelsMappingProvider should be configured"));
+    }
+  }
+
+  @Test
+  public void testWithNodeLabelUpdateEnabled() throws Exception {
+    conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+        1000);
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    
rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval
+        = 3 * 1000;
+    rm.start();
+
+    RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager();
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+
+    NodeId nodeId = toNodeId("h1:1234");
+    assertEquals(0, mgr.getLabelsOnNode(nodeId).size());
+    updateNodeLabels(nodeId, "x");
+    registerNode(rm, nodeId);
+    Thread.sleep(4000);
+    assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId));
+
+    // Ensure that node labels are updated if NodeLabelsProvider
+    // gives different labels
+    updateNodeLabels(nodeId, "y");
+    Thread.sleep(4000);
+    assertCollectionEquals(ImmutableSet.of("y"), mgr.getLabelsOnNode(nodeId));
+
+    rm.stop();
+  }
+
+  @Test
+  public void testWithNodeLabelUpdateDisabled() throws Exception {
+    conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+        RMDelegatedNodeLabelsUpdater.DISABLE_DELEGATED_NODE_LABELS_UPDATE);
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    
rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval
+        = 3 * 1000;
+    rm.start();
+
+    RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager();
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
+
+    NodeId nodeId = toNodeId("h1:1234");
+    updateNodeLabels(nodeId, "x");
+    registerNode(rm, nodeId);
+    Thread.sleep(4000);
+    // Ensure that even though timer is not run, node labels are fetched
+    // when node is registered
+    assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId));
+
+    rm.stop();
+  }
+
+  private void registerNode(ResourceManager rm, NodeId nodeId)
+      throws YarnException, IOException {
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest req =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    req.setResource(capability);
+    req.setNodeId(nodeId);
+    req.setHttpPort(1234);
+    req.setNMVersion(YarnVersionInfo.getVersion());
+    resourceTrackerService.registerNodeManager(req);
+  }
+
+  private void updateNodeLabels(NodeId nodeId, String... nodeLabelsStr) {
+    nodeLabelsMap.put(nodeId, toNodeLabelSet(nodeLabelsStr));
+  }
+
+  public static class DummyRMNodeLabelsMappingProvider extends
+      RMNodeLabelsMappingProvider {
+    public DummyRMNodeLabelsMappingProvider() {
+      super("DummyRMNodeLabelsMappingProvider");
+    }
+
+    @Override
+    public Map<NodeId, Set<NodeLabel>> getNodeLabels(Set<NodeId> nodes) {
+      Map<NodeId, Set<NodeLabel>> nodeLabels = Maps.newHashMap();
+      for(NodeId node : nodes) {
+        nodeLabels.put(node, nodeLabelsMap.get(node));
+      }
+      return nodeLabels;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28edc7b1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
index 53a9902..472dab6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
@@ -445,8 +445,8 @@ public class TestRMWebServicesNodeLabels extends 
JerseyTestBase {
             .post(ClientResponse.class);
     LOG.info("posted node nodelabel");
 
-    //setting rmWebService for Distributed NodeLabel Configuration
-    rmWebService.isDistributedNodeLabelConfiguration = true;
+    //setting rmWebService for non Centralized NodeLabel Configuration
+    rmWebService.isCentralizedNodeLabelConfiguration = false;
 
     // Case1 : Replace labels using node-to-labels
     ntli = new NodeToLabelsEntryList();

Reply via email to