YARN-3671. Integrate Federation services with ResourceManager. Contributed by 
Subru Krishnan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ae955b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ae955b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ae955b3

Branch: refs/heads/YARN-2915
Commit: 5ae955b3501816807fa7b801d4ef2a31ba7e8684
Parents: b5af206
Author: Jian He <jia...@apache.org>
Authored: Tue Aug 30 12:20:52 2016 +0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Sep 19 12:58:34 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  11 +-
 .../yarn/conf/TestYarnConfigurationFields.java  |   4 +-
 .../failover/FederationProxyProviderUtil.java   |   2 +-
 .../FederationRMFailoverProxyProvider.java      |   4 +-
 ...ationMembershipStateStoreInputValidator.java |   7 +-
 .../TestFederationStateStoreInputValidator.java |  10 +-
 .../server/resourcemanager/ResourceManager.java |  26 ++
 .../FederationStateStoreHeartbeat.java          | 108 +++++++
 .../federation/FederationStateStoreService.java | 304 +++++++++++++++++++
 .../federation/package-info.java                |  17 ++
 .../webapp/dao/ClusterMetricsInfo.java          |   5 +-
 .../TestFederationRMStateStoreService.java      | 170 +++++++++++
 12 files changed, 648 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 293df71..50dc154 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2494,9 +2494,6 @@ public class YarnConfiguration extends Configuration {
       FEDERATION_PREFIX + "failover.enabled";
   public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
 
-  public static final String FEDERATION_SUBCLUSTER_ID =
-      FEDERATION_PREFIX + "sub-cluster.id";
-
   public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
       FEDERATION_PREFIX + "state-store.class";
 
@@ -2509,6 +2506,14 @@ public class YarnConfiguration extends Configuration {
   // 5 minutes
   public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
 
+  public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
+      FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
+
+  // 5 minutes
+  public static final int
+      DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
+      5 * 60;
+
   public static final String FEDERATION_MACHINE_LIST =
       FEDERATION_PREFIX + "machine-list";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 63413eb..56b13c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -96,9 +96,9 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
     configurationPropsToSkipCompare
-        .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
-    configurationPropsToSkipCompare
         .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
 
     // Ignore blacklisting nodes for AM failures feature since it is still a
     // "work in progress"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
index a986008..18f1338 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
@@ -134,7 +134,7 @@ public final class FederationProxyProviderUtil {
   // are based out of conf
   private static void updateConf(Configuration conf,
       SubClusterId subClusterId) {
-    conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
     // In a Federation setting, we will connect to not just the local cluster 
RM
     // but also multiple external RMs. The membership information of all the 
RMs
     // that are currently

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
index 90a9239..0ffab0b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -74,8 +74,8 @@ public class FederationRMFailoverProxyProvider<T>
     this.protocol = proto;
     this.rmProxy.checkAllowedProtocols(this.protocol);
     String clusterId =
-        configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
-    Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
+        configuration.get(YarnConfiguration.RM_CLUSTER_ID);
+    Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
     this.subClusterId = SubClusterId.newInstance(clusterId);
     this.facade = facade.getInstance();
     if (configuration instanceof YarnConfiguration) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
index b587ee5..ff9d8e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -159,7 +159,10 @@ public final class 
FederationMembershipStateStoreInputValidator {
   }
 
   /**
-   * Validate if the SubCluster Info are present or not.
+   * Validate if all the required fields on {@link SubClusterInfo} are present
+   * or not. {@code Capability} will be empty as the corresponding
+   * {@code ResourceManager} is in the process of initialization during
+   * registration.
    *
    * @param subClusterInfo the information of the subcluster to be verified
    * @throws FederationStateStoreInvalidInputException if the SubCluster Info
@@ -194,8 +197,6 @@ public final class 
FederationMembershipStateStoreInputValidator {
     // validate subcluster state
     checkSubClusterState(subClusterInfo.getState());
 
-    // validate subcluster capability
-    checkCapability(subClusterInfo.getCapability());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
index 13175ae..b95f17a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
@@ -242,11 +242,8 @@ public class TestFederationStateStoreInputValidator {
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
           .validateSubClusterRegisterRequest(request);
-      Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
-      LOG.info(e.getMessage());
-      Assert.assertTrue(
-          e.getMessage().startsWith("Invalid capability information."));
+      Assert.fail(e.getMessage());
     }
 
     // Execution with Empty Capability
@@ -260,11 +257,8 @@ public class TestFederationStateStoreInputValidator {
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
           .validateSubClusterRegisterRequest(request);
-      Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
-      LOG.info(e.getMessage());
-      Assert.assertTrue(
-          e.getMessage().startsWith("Invalid capability information."));
+      Assert.fail(e.getMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index bf72fc1..15330bc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
@@ -175,6 +176,7 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   protected RMAppManager rmAppManager;
   protected ApplicationACLsManager applicationACLsManager;
   protected QueueACLsManager queueACLsManager;
+  private FederationStateStoreService federationStateStoreService;
   private WebApp webApp;
   private AppReportFetcher fetcher = null;
   protected ResourceTrackerService resourceTracker;
@@ -467,6 +469,10 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     return new RMTimelineCollectorManager(rmContext);
   }
 
+  private FederationStateStoreService createFederationStateStoreService() {
+    return new FederationStateStoreService(rmContext);
+  }
+
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
     SystemMetricsPublisher publisher;
     if (YarnConfiguration.timelineServiceEnabled(conf) &&
@@ -689,6 +695,20 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
         delegationTokenRenewer.setRMContext(rmContext);
       }
 
+      if(HAUtil.isFederationEnabled(conf)) {
+        String cId = YarnConfiguration.getClusterId(conf);
+        if (cId.isEmpty()) {
+          String errMsg =
+              "Cannot initialize RM as Federation is enabled"
+                  + " but cluster id is not configured.";
+          LOG.error(errMsg);
+          throw new YarnRuntimeException(errMsg);
+        }
+        federationStateStoreService = createFederationStateStoreService();
+        addIfService(federationStateStoreService);
+        LOG.info("Initialized Federation membership.");
+      }
+
       new RMNMInfo(rmContext, scheduler);
 
       super.serviceInit(conf);
@@ -1262,6 +1282,12 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   }
 
   @Private
+  @VisibleForTesting
+  public FederationStateStoreService getFederationStateStoreService() {
+    return this.federationStateStoreService;
+  }
+
+  @Private
   WebApp getWebapp() {
     return this.webApp;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
new file mode 100644
index 0000000..a4618a2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import java.io.StringWriter;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+
+/**
+ * Periodic heart beat from a <code>ResourceManager</code> participating in
+ * federation to indicate liveliness. The heart beat publishes the current
+ * capabilities as represented by {@link ClusterMetricsInfo} of the sub 
cluster.
+ *
+ */
+public class FederationStateStoreHeartbeat implements Runnable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationStateStoreHeartbeat.class);
+
+  private SubClusterId subClusterId;
+  private FederationStateStore stateStoreService;
+
+  private final ResourceScheduler rs;
+
+  private StringWriter currentClusterState;
+  private JSONJAXBContext jc;
+  private JSONMarshaller marshaller;
+  private String capability;
+
+  public FederationStateStoreHeartbeat(SubClusterId subClusterId,
+      FederationStateStore stateStoreClient, ResourceScheduler scheduler) {
+    this.stateStoreService = stateStoreClient;
+    this.subClusterId = subClusterId;
+    this.rs = scheduler;
+    // Initialize the JAXB Marshaller
+    this.currentClusterState = new StringWriter();
+    try {
+      this.jc = new JSONJAXBContext(
+          JSONConfiguration.mapped().rootUnwrapping(false).build(),
+          ClusterMetricsInfo.class);
+      marshaller = jc.createJSONMarshaller();
+    } catch (JAXBException e) {
+      LOG.warn("Exception while trying to initialize JAXB context.", e);
+    }
+    LOG.info("Initialized Federation membership for cluster with timestamp:  "
+        + ResourceManager.getClusterTimeStamp());
+  }
+
+  /**
+   * Get the current cluster state as a JSON string representation of the
+   * {@link ClusterMetricsInfo}.
+   */
+  private void updateClusterState() {
+    try {
+      // get the current state
+      currentClusterState.getBuffer().setLength(0);
+      ClusterMetricsInfo clusterMetricsInfo = new ClusterMetricsInfo(rs);
+      marshaller.marshallToJSON(clusterMetricsInfo, currentClusterState);
+      capability = currentClusterState.toString();
+    } catch (Exception e) {
+      LOG.warn("Exception while trying to generate cluster state,"
+          + " so reverting to last know state.", e);
+    }
+  }
+
+  @Override
+  public synchronized void run() {
+    try {
+      updateClusterState();
+      SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+          .newInstance(subClusterId, SubClusterState.SC_RUNNING, capability);
+      stateStoreService.subClusterHeartbeat(request);
+      LOG.debug("Sending the heartbeat with capability: {}", capability);
+    } catch (Exception e) {
+      LOG.warn("Exception when trying to heartbeat: ", e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
new file mode 100644
index 0000000..9a01d7e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implements {@link FederationStateStore} and provides a service for
+ * participating in the federation membership.
+ */
+public class FederationStateStoreService extends AbstractService
+    implements FederationStateStore {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FederationStateStoreService.class);
+
+  private Configuration config;
+  private ScheduledExecutorService scheduledExecutorService;
+  private FederationStateStoreHeartbeat stateStoreHeartbeat;
+  private FederationStateStore stateStoreClient = null;
+  private SubClusterId subClusterId;
+  private long heartbeatInterval;
+  private RMContext rmContext;
+
+  public FederationStateStoreService(RMContext rmContext) {
+    super(FederationStateStoreService.class.getName());
+    LOG.info("FederationStateStoreService initialized");
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    this.config = conf;
+
+    RetryPolicy retryPolicy =
+        FederationStateStoreFacade.createRetryPolicy(conf);
+
+    this.stateStoreClient =
+        (FederationStateStore) FederationStateStoreFacade.createRetryInstance(
+            conf, YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
+            YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
+            FederationStateStore.class, retryPolicy);
+    this.stateStoreClient.init(conf);
+    LOG.info("Initialized state store client class");
+
+    this.subClusterId =
+        SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
+
+    heartbeatInterval = conf.getLong(
+        YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
+        
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
+    if (heartbeatInterval <= 0) {
+      heartbeatInterval =
+          
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
+    }
+    LOG.info("Initialized federation membership service.");
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+
+    registerAndInitializeHeartbeat();
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    Exception ex = null;
+    try {
+      if (this.scheduledExecutorService != null
+          && !this.scheduledExecutorService.isShutdown()) {
+        this.scheduledExecutorService.shutdown();
+        LOG.info("Stopped federation membership heartbeat");
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to shutdown ScheduledExecutorService", e);
+      ex = e;
+    }
+
+    if (this.stateStoreClient != null) {
+      try {
+        deregisterSubCluster(SubClusterDeregisterRequest
+            .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+      } finally {
+        this.stateStoreClient.close();
+      }
+    }
+
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  // Return a client accessible string representation of the service address.
+  private String getServiceAddress(InetSocketAddress address) {
+    InetSocketAddress socketAddress = NetUtils.getConnectAddress(address);
+    return socketAddress.getAddress().getHostAddress() + ":"
+        + socketAddress.getPort();
+  }
+
+  private void registerAndInitializeHeartbeat() {
+    String clientRMAddress =
+        getServiceAddress(rmContext.getClientRMService().getBindAddress());
+    String amRMAddress = getServiceAddress(
+        rmContext.getApplicationMasterService().getBindAddress());
+    String rmAdminAddress = getServiceAddress(
+        config.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+    String webAppAddress =
+        WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(config);
+
+    SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+        amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
+        SubClusterState.SC_NEW, ResourceManager.getClusterTimeStamp(), "");
+    try {
+      
registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo));
+      LOG.info("Successfully registered for federation subcluster: {}",
+          subClusterInfo);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(
+          "Failed to register Federation membership with the StateStore", e);
+    }
+    stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId,
+        stateStoreClient, rmContext.getScheduler());
+    scheduledExecutorService =
+        HadoopExecutors.newSingleThreadScheduledExecutor();
+    scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
+        heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
+    LOG.info("Started federation membership heartbeat with interval: {}",
+        heartbeatInterval);
+  }
+
+  @VisibleForTesting
+  public FederationStateStore getStateStoreClient() {
+    return stateStoreClient;
+  }
+
+  @VisibleForTesting
+  public FederationStateStoreHeartbeat getStateStoreHeartbeatThread() {
+    return stateStoreHeartbeat;
+  }
+
+  @Override
+  public Version getCurrentVersion() {
+    return stateStoreClient.getCurrentVersion();
+  }
+
+  @Override
+  public Version loadVersion() {
+    return stateStoreClient.getCurrentVersion();
+  }
+
+  @Override
+  public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+      GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+    return stateStoreClient.getPolicyConfiguration(request);
+  }
+
+  @Override
+  public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+      SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+    return stateStoreClient.setPolicyConfiguration(request);
+  }
+
+  @Override
+  public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+      GetSubClusterPoliciesConfigurationsRequest request) throws YarnException 
{
+    return stateStoreClient.getPoliciesConfigurations(request);
+  }
+
+  @Override
+  public SubClusterRegisterResponse registerSubCluster(
+      SubClusterRegisterRequest registerSubClusterRequest)
+      throws YarnException {
+    return stateStoreClient.registerSubCluster(registerSubClusterRequest);
+  }
+
+  @Override
+  public SubClusterDeregisterResponse deregisterSubCluster(
+      SubClusterDeregisterRequest subClusterDeregisterRequest)
+      throws YarnException {
+    return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest);
+  }
+
+  @Override
+  public SubClusterHeartbeatResponse subClusterHeartbeat(
+      SubClusterHeartbeatRequest subClusterHeartbeatRequest)
+      throws YarnException {
+    return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest);
+  }
+
+  @Override
+  public GetSubClusterInfoResponse getSubCluster(
+      GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+    return stateStoreClient.getSubCluster(subClusterRequest);
+  }
+
+  @Override
+  public GetSubClustersInfoResponse getSubClusters(
+      GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+    return stateStoreClient.getSubClusters(subClustersRequest);
+  }
+
+  @Override
+  public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+      AddApplicationHomeSubClusterRequest request) throws YarnException {
+    return stateStoreClient.addApplicationHomeSubCluster(request);
+  }
+
+  @Override
+  public UpdateApplicationHomeSubClusterResponse 
updateApplicationHomeSubCluster(
+      UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+    return stateStoreClient.updateApplicationHomeSubCluster(request);
+  }
+
+  @Override
+  public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+      GetApplicationHomeSubClusterRequest request) throws YarnException {
+    return stateStoreClient.getApplicationHomeSubCluster(request);
+  }
+
+  @Override
+  public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+      GetApplicationsHomeSubClusterRequest request) throws YarnException {
+    return stateStoreClient.getApplicationsHomeSubCluster(request);
+  }
+
+  @Override
+  public DeleteApplicationHomeSubClusterResponse 
deleteApplicationHomeSubCluster(
+      DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+    return stateStoreClient.deleteApplicationHomeSubCluster(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
new file mode 100644
index 0000000..47c7c65
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.federation;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
index 1789e09..a7f563d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
@@ -64,7 +64,10 @@ public class ClusterMetricsInfo {
   } // JAXB needs this
 
   public ClusterMetricsInfo(final ResourceManager rm) {
-    ResourceScheduler rs = rm.getResourceScheduler();
+    this(rm.getResourceScheduler());
+  }
+
+  public ClusterMetricsInfo(final ResourceScheduler rs) {
     QueueMetrics metrics = rs.getRootQueueMetrics();
     ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae955b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
new file mode 100644
index 0000000..30f69b5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+
+/**
+ * Unit tests for FederationStateStoreService.
+ */
+public class TestFederationRMStateStoreService {
+
+  private final HAServiceProtocol.StateChangeRequestInfo requestInfo =
+      new HAServiceProtocol.StateChangeRequestInfo(
+          HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+  private final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
+  private final GetSubClusterInfoRequest request =
+      GetSubClusterInfoRequest.newInstance(subClusterId);
+
+  private Configuration conf;
+  private FederationStateStore stateStore;
+  private long lastHearbeatTS = 0;
+  private JSONJAXBContext jc;
+  private JSONUnmarshaller unmarshaller;
+
+  @Before
+  public void setUp() throws IOException, YarnException, JAXBException {
+    conf = new YarnConfiguration();
+    jc = new JSONJAXBContext(
+        JSONConfiguration.mapped().rootUnwrapping(false).build(),
+        ClusterMetricsInfo.class);
+    unmarshaller = jc.createJSONUnmarshaller();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    unmarshaller = null;
+    jc = null;
+  }
+
+  @Test
+  public void testFederationStateStoreService() throws Exception {
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+    final MockRM rm = new MockRM(conf);
+
+    // Initially there should be no entry for the sub-cluster
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+    try {
+      stateStore.getSubCluster(request);
+      Assert.fail("There should be no entry for the sub-cluster.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().endsWith("does not exist"));
+    }
+
+    // Validate if sub-cluster is registered
+    rm.start();
+    String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
+    Assert.assertTrue(capability.isEmpty());
+
+    // Heartbeat to see if sub-cluster transitions to running
+    FederationStateStoreHeartbeat storeHeartbeat =
+        rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
+    storeHeartbeat.run();
+    capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+    checkClusterMetricsInfo(capability, 0);
+
+    // heartbeat again after adding a node.
+    rm.registerNode("127.0.0.1:1234", 4 * 1024);
+    storeHeartbeat.run();
+    capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+    checkClusterMetricsInfo(capability, 1);
+
+    // Validate sub-cluster deregistration
+    rm.getFederationStateStoreService()
+        .deregisterSubCluster(SubClusterDeregisterRequest
+            .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+    checkSubClusterInfo(SubClusterState.SC_UNREGISTERED);
+
+    // check after failover
+    explicitFailover(rm);
+
+    capability = checkSubClusterInfo(SubClusterState.SC_NEW);
+    Assert.assertTrue(capability.isEmpty());
+
+    // Heartbeat to see if sub-cluster transitions to running
+    storeHeartbeat =
+        rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
+    storeHeartbeat.run();
+    capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+    checkClusterMetricsInfo(capability, 0);
+
+    // heartbeat again after adding a node.
+    rm.registerNode("127.0.0.1:1234", 4 * 1024);
+    storeHeartbeat.run();
+    capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+    checkClusterMetricsInfo(capability, 1);
+
+    rm.stop();
+  }
+
+  private void explicitFailover(MockRM rm) throws IOException {
+    rm.getAdminService().transitionToStandby(requestInfo);
+    Assert.assertTrue(rm.getRMContext()
+        .getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY);
+    rm.getAdminService().transitionToActive(requestInfo);
+    Assert.assertTrue(rm.getRMContext()
+        .getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE);
+    lastHearbeatTS = 0;
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+  }
+
+  private void checkClusterMetricsInfo(String capability, int numNodes)
+      throws JAXBException {
+    ClusterMetricsInfo clusterMetricsInfo = unmarshaller.unmarshalFromJSON(
+        new StringReader(capability), ClusterMetricsInfo.class);
+    Assert.assertEquals(numNodes, clusterMetricsInfo.getTotalNodes());
+  }
+
+  private String checkSubClusterInfo(SubClusterState state)
+      throws YarnException {
+    Assert.assertNotNull(stateStore.getSubCluster(request));
+    SubClusterInfo response =
+        stateStore.getSubCluster(request).getSubClusterInfo();
+    Assert.assertEquals(state, response.getState());
+    Assert.assertTrue(response.getLastHeartBeat() >= lastHearbeatTS);
+    lastHearbeatTS = response.getLastHeartBeat();
+    return response.getCapability();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to