http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
new file mode 100644
index 0000000..5dfb356
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically check if the {@link org.apache.hadoop.hdfs.server.
+ * federation.store.StateStoreService StateStoreService} cached information in
+ * the {@link Router} is up to date. This is for performance and removes the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService
+ * StateStoreService} from the critical path in common operations.
+ */
+public class RouterSafemodeService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterSafemodeService.class);
+
+  /** Router to manage safe mode. */
+  private final Router router;
+
+  /** Interval in ms to wait post startup before allowing RPC requests. */
+  private long startupInterval;
+  /** Interval in ms after which the State Store cache is too stale. */
+  private long staleInterval;
+  /** Start time in ms of this service. */
+  private long startupTime;
+
+  /** The time the Router enters safe mode in milliseconds. */
+  private long enterSafeModeTime = now();
+
+
+  /**
+   * Create a new Cache update service.
+   *
+   * @param router Router containing the cache.
+   */
+  public RouterSafemodeService(Router router) {
+    super(RouterSafemodeService.class.getSimpleName());
+    this.router = router;
+  }
+
+  /**
+   * Enter safe mode.
+   */
+  private void enter() {
+    LOG.info("Entering safe mode");
+    enterSafeModeTime = now();
+    RouterRpcServer rpcServer = router.getRpcServer();
+    rpcServer.setSafeMode(true);
+    router.updateRouterState(RouterServiceState.SAFEMODE);
+  }
+
+  /**
+   * Leave safe mode.
+   */
+  private void leave() {
+    // Cache recently updated, leave safemode
+    long timeInSafemode = now() - enterSafeModeTime;
+    LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode);
+    RouterMetrics routerMetrics = router.getRouterMetrics();
+    if (routerMetrics == null) {
+      LOG.error("The Router metrics are not enabled");
+    } else {
+      routerMetrics.setSafeModeTime(timeInSafemode);
+    }
+    RouterRpcServer rpcServer = router.getRpcServer();
+    rpcServer.setSafeMode(false);
+    router.updateRouterState(RouterServiceState.RUNNING);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    // Use same interval as cache update service
+    this.setIntervalMs(conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+        RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
+    this.startupInterval = conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION,
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.info("Leave startup safe mode after {} ms", this.startupInterval);
+
+    this.staleInterval = conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION,
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.info("Enter safe mode after {} ms without reaching the State Store",
+        this.staleInterval);
+
+    this.startupTime = Time.now();
+
+    // Initializing the RPC server in safe mode, it will disable it later
+    enter();
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    long now = Time.now();
+    long delta = now - startupTime;
+    if (delta < startupInterval) {
+      LOG.info("Delaying safemode exit for {} milliseconds...",
+          this.startupInterval - delta);
+      return;
+    }
+    RouterRpcServer rpcServer = router.getRpcServer();
+    StateStoreService stateStore = router.getStateStore();
+    long cacheUpdateTime = stateStore.getCacheUpdateTime();
+    boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
+
+    // Always update to indicate our cache was updated
+    if (isCacheStale) {
+      if (!rpcServer.isInSafeMode()) {
+        enter();
+      }
+    } else if (rpcServer.isInSafeMode()) {
+      // Cache recently updated, leave safe mode
+      leave();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
new file mode 100644
index 0000000..3accbe9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+/**
+ * States of the Router.
+ */
+public enum RouterServiceState {
+  UNINITIALIZED,
+  INITIALIZING,
+  SAFEMODE,
+  RUNNING,
+  STOPPING,
+  SHUTDOWN,
+  EXPIRED;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java
new file mode 100644
index 0000000..527600c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
+
+/**
+ * Interface of managing the Router state.
+ */
+public interface RouterStateManager {
+  /**
+   * Enter safe mode and change Router state to RouterServiceState#SAFEMODE.
+   */
+  EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request)
+      throws IOException;
+
+  /**
+   * Leave safe mode and change Router state to RouterServiceState#RUNNING.
+   */
+  LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request)
+      throws IOException;
+
+  /**
+   * Verify if current Router state is safe mode.
+   */
+  GetSafeModeResponse getSafeMode(GetSafeModeRequest request)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java
new file mode 100644
index 0000000..327f39b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The router package includes the core services for a HDFS federation router.
+ * The {@link Router} acts as a transparent proxy in front of a cluster of
+ * multiple NameNodes and nameservices. The {@link RouterRpcServer} exposes the
+ * NameNode clientProtocol and is the primary contact point for DFS clients in 
a
+ * federated cluster.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
new file mode 100644
index 0000000..cdd4449
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Record store that takes care of caching the records in memory.
+ *
+ * @param <R> Record to store by this interface.
+ */
+public abstract class CachedRecordStore<R extends BaseRecord>
+    extends RecordStore<R> implements StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CachedRecordStore.class);
+
+
+  /** Prevent loading the cache more than once every 500 ms. */
+  private static final long MIN_UPDATE_MS = 500;
+
+
+  /** Cached entries. */
+  private List<R> records = new ArrayList<>();
+
+  /** Time stamp of the cached entries. */
+  private long timestamp = -1;
+
+  /** If the cache is initialized. */
+  private boolean initialized = false;
+
+  /** Last time the cache was updated. */
+  private long lastUpdate = -1;
+
+  /** Lock to access the memory cache. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  /** If it should override the expired values when loading the cache. */
+  private boolean override = false;
+
+
+  /**
+   * Create a new cached record store.
+   *
+   * @param clazz Class of the record to store.
+   * @param driver State Store driver.
+   */
+  protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) {
+    this(clazz, driver, false);
+  }
+
+  /**
+   * Create a new cached record store.
+   *
+   * @param clazz Class of the record to store.
+   * @param driver State Store driver.
+   * @param over If the entries should be override if they expire
+   */
+  protected CachedRecordStore(
+      Class<R> clazz, StateStoreDriver driver, boolean over) {
+    super(clazz, driver);
+
+    this.override = over;
+  }
+
+  /**
+   * Check that the cache of the State Store information is available.
+   *
+   * @throws StateStoreUnavailableException If the cache is not initialized.
+   */
+  private void checkCacheAvailable() throws StateStoreUnavailableException {
+    if (!this.initialized) {
+      throw new StateStoreUnavailableException(
+          "Cached State Store not initialized, " +
+          getRecordClass().getSimpleName() + " records not valid");
+    }
+  }
+
+  @Override
+  public boolean loadCache(boolean force) throws IOException {
+    // Prevent loading the cache too frequently
+    if (force || isUpdateTime()) {
+      List<R> newRecords = null;
+      long t = -1;
+      try {
+        QueryResult<R> result = getDriver().get(getRecordClass());
+        newRecords = result.getRecords();
+        t = result.getTimestamp();
+
+        // If we have any expired record, update the State Store
+        if (this.override) {
+          overrideExpiredRecords(result);
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get \"{}\" records from the State Store",
+            getRecordClass().getSimpleName());
+        this.initialized = false;
+        return false;
+      }
+
+      // Update cache atomically
+      writeLock.lock();
+      try {
+        this.records.clear();
+        this.records.addAll(newRecords);
+        this.timestamp = t;
+        this.initialized = true;
+      } finally {
+        writeLock.unlock();
+      }
+
+      // Update the metrics for the cache State Store size
+      StateStoreMetrics metrics = getDriver().getMetrics();
+      if (metrics != null) {
+        String recordName = getRecordClass().getSimpleName();
+        metrics.setCacheSize(recordName, this.records.size());
+      }
+
+      lastUpdate = Time.monotonicNow();
+    }
+    return true;
+  }
+
+  /**
+   * Check if it's time to update the cache. Update it it was never updated.
+   *
+   * @return If it's time to update this cache.
+   */
+  private boolean isUpdateTime() {
+    return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS;
+  }
+
+  /**
+   * Updates the state store with any record overrides we detected, such as an
+   * expired state.
+   *
+   * @param query RecordQueryResult containing the data to be inspected.
+   * @throws IOException
+   */
+  public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
+    List<R> commitRecords = new ArrayList<>();
+    List<R> newRecords = query.getRecords();
+    long currentDriverTime = query.getTimestamp();
+    if (newRecords == null || currentDriverTime <= 0) {
+      LOG.error("Cannot check overrides for record");
+      return;
+    }
+    for (R record : newRecords) {
+      if (record.checkExpired(currentDriverTime)) {
+        String recordName = StateStoreUtils.getRecordName(record.getClass());
+        LOG.info("Override State Store record {}: {}", recordName, record);
+        commitRecords.add(record);
+      }
+    }
+    if (commitRecords.size() > 0) {
+      getDriver().putAll(commitRecords, true, false);
+    }
+  }
+
+  /**
+   * Updates the state store with any record overrides we detected, such as an
+   * expired state.
+   *
+   * @param record Record record to be updated.
+   * @throws IOException
+   */
+  public void overrideExpiredRecord(R record) throws IOException {
+    List<R> newRecords = Collections.singletonList(record);
+    long time = getDriver().getTime();
+    QueryResult<R> query = new QueryResult<>(newRecords, time);
+    overrideExpiredRecords(query);
+  }
+
+  /**
+   * Get all the cached records.
+   *
+   * @return Copy of the cached records.
+   * @throws StateStoreUnavailableException If the State store is not 
available.
+   */
+  public List<R> getCachedRecords() throws StateStoreUnavailableException {
+    checkCacheAvailable();
+
+    List<R> ret = new LinkedList<R>();
+    this.readLock.lock();
+    try {
+      ret.addAll(this.records);
+    } finally {
+      this.readLock.unlock();
+    }
+    return ret;
+  }
+
+  /**
+   * Get all the cached records and the time stamp of the cache.
+   *
+   * @return Copy of the cached records and the time stamp.
+   * @throws StateStoreUnavailableException If the State store is not 
available.
+   */
+  protected QueryResult<R> getCachedRecordsAndTimeStamp()
+      throws StateStoreUnavailableException {
+    checkCacheAvailable();
+
+    this.readLock.lock();
+    try {
+      return new QueryResult<R>(this.records, this.timestamp);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
new file mode 100644
index 0000000..3e8ba6b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * Management API for NameNode registrations stored in
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.records.MembershipState
+ * MembershipState} records. The {@link org.apache.hadoop.hdfs.server.
+ * federation.router.RouterHeartbeatService RouterHeartbeatService} 
periodically
+ * polls each NN to update the NameNode metadata(addresses, operational) and HA
+ * state(active, standby). Each NameNode may be polled by multiple
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}
+ * instances.
+ * <p>
+ * Once fetched from the
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver}, NameNode registrations are cached until the next query.
+ * The fetched registration data is aggregated using a quorum to determine the
+ * best/most accurate state for each NameNode. The cache is periodically 
updated
+ * by the @{link StateStoreCacheUpdateService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MembershipStore
+    extends CachedRecordStore<MembershipState> {
+
+  protected MembershipStore(StateStoreDriver driver) {
+    super(MembershipState.class, driver, true);
+  }
+
+  /**
+   * Inserts or updates a namenode membership entry into the table.
+   *
+   * @param request Fully populated NamenodeHeartbeatRequest request.
+   * @return True if successful, false otherwise.
+   * @throws StateStoreUnavailableException Throws exception if the data store
+   *           is not initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract NamenodeHeartbeatResponse namenodeHeartbeat(
+      NamenodeHeartbeatRequest request) throws IOException;
+
+  /**
+   * Queries for a single cached registration entry matching the given
+   * parameters. Possible keys are the names of data structure elements 
Possible
+   * values are matching SQL "LIKE" targets.
+   *
+   * @param request Fully populated GetNamenodeRegistrationsRequest request.
+   * @return Single matching FederationMembershipStateEntry or null if not 
found
+   *         or more than one entry matches.
+   * @throws StateStoreUnavailableException Throws exception if the data store
+   *           is not initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract GetNamenodeRegistrationsResponse getNamenodeRegistrations(
+      GetNamenodeRegistrationsRequest request) throws IOException;
+
+  /**
+   * Get the expired registrations from the registration cache.
+   *
+   * @return Expired registrations or zero-length list if none are found.
+   * @throws StateStoreUnavailableException Throws exception if the data store
+   *           is not initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract GetNamenodeRegistrationsResponse
+      getExpiredNamenodeRegistrations(GetNamenodeRegistrationsRequest request)
+          throws IOException;
+
+  /**
+   * Retrieves a list of registered nameservices and their associated info.
+   *
+   * @param request
+   * @return Collection of information for each registered nameservice.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract GetNamespaceInfoResponse getNamespaceInfo(
+      GetNamespaceInfoRequest request) throws IOException;
+
+  /**
+   * Overrides a cached namenode state with an updated state.
+   *
+   * @param request Fully populated OverrideNamenodeRegistrationRequest 
request.
+   * @return OverrideNamenodeRegistrationResponse
+   * @throws StateStoreUnavailableException if the data store is not
+   *           initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract UpdateNamenodeRegistrationResponse 
updateNamenodeRegistration(
+      UpdateNamenodeRegistrationRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
new file mode 100644
index 0000000..b439659
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * Management API for the HDFS mount table information stored in
+ * {@link org.apache.hadoop.hdfs.server.federation.store.records.MountTable
+ * MountTable} records. The mount table contains entries that map a particular
+ * global namespace path one or more HDFS nameservices (NN) + target path. It 
is
+ * possible to map mount locations for root folders, directories or individual
+ * files.
+ * <p>
+ * Once fetched from the
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver}, MountTable records are cached in a tree for faster 
access.
+ * Each path in the global namespace is mapped to a nameserivce ID and local
+ * path upon request. The cache is periodically updated by the @{link
+ * StateStoreCacheUpdateService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MountTableStore extends CachedRecordStore<MountTable>
+    implements MountTableManager {
+
+  public MountTableStore(StateStoreDriver driver) {
+    super(MountTable.class, driver);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
new file mode 100644
index 0000000..53a8b82
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+
+/**
+ * Store records in the State Store. Subclasses provide interfaces to operate 
on
+ * those records.
+ *
+ * @param <R> Record to store by this interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RecordStore<R extends BaseRecord> {
+
+  private static final Log LOG = LogFactory.getLog(RecordStore.class);
+
+
+  /** Class of the record stored in this State Store. */
+  private final Class<R> recordClass;
+
+  /** State store driver backed by persistent storage. */
+  private final StateStoreDriver driver;
+
+
+  /**
+   * Create a new store for records.
+   *
+   * @param clazz Class of the record to store.
+   * @param stateStoreDriver Driver for the State Store.
+   */
+  protected RecordStore(Class<R> clazz, StateStoreDriver stateStoreDriver) {
+    this.recordClass = clazz;
+    this.driver = stateStoreDriver;
+  }
+
+  /**
+   * Report a required record to the data store. The data store uses this to
+   * create/maintain storage for the record.
+   *
+   * @return The class of the required record or null if no record is required
+   *         for this interface.
+   */
+  public Class<R> getRecordClass() {
+    return this.recordClass;
+  }
+
+  /**
+   * Get the State Store driver.
+   *
+   * @return State Store driver.
+   */
+  protected StateStoreDriver getDriver() {
+    return this.driver;
+  }
+
+  /**
+   * Build a state store API implementation interface.
+   *
+   * @param clazz The specific interface implementation to create
+   * @param driver The {@link StateStoreDriver} implementation in use.
+   * @return An initialized instance of the specified state store API
+   *         implementation.
+   */
+  public static <T extends RecordStore<?>> T newInstance(
+      final Class<T> clazz, final StateStoreDriver driver) {
+
+    try {
+      Constructor<T> constructor = 
clazz.getConstructor(StateStoreDriver.class);
+      T recordStore = constructor.newInstance(driver);
+      return recordStore;
+    } catch (Exception e) {
+      LOG.error("Cannot create new instance for " + clazz, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java
new file mode 100644
index 0000000..c6a0dad
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+
+/**
+ * Management API for
+ * {@link org.apache.hadoop.hdfs.server.federation.store.records.RouterState
+ *  RouterState} records in the state store. Accesses the data store via the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.
+ * StateStoreDriver StateStoreDriver} interface. No data is cached.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RouterStore extends CachedRecordStore<RouterState> {
+
+  public RouterStore(StateStoreDriver driver) {
+    super(RouterState.class, driver, true);
+  }
+
+  /**
+   * Fetches the current router state object.
+   *
+   * @param request Fully populated request object.
+   * @return The matching router record or null if none exists.
+   * @throws IOException Throws exception if unable to query the data store or
+   *           if more than one matching record is found.
+   */
+  public abstract GetRouterRegistrationResponse getRouterRegistration(
+      GetRouterRegistrationRequest request) throws IOException;
+
+  /**
+   * Fetches all router status objects.
+   *
+   * @param request Fully populated request object.
+   * @return List of Router records present in the data store.
+   * @throws IOException Throws exception if unable to query the data store
+   */
+  public abstract GetRouterRegistrationsResponse getRouterRegistrations(
+      GetRouterRegistrationsRequest request) throws IOException;
+
+  /**
+   * Update the state of this router in the State Store.
+   *
+   * @param request Fully populated request object.
+   * @return True if the update was successfully recorded, false otherwise.
+   * @throws IOException Throws exception if unable to query the data store
+   */
+  public abstract RouterHeartbeatResponse routerHeartbeat(
+      RouterHeartbeatRequest request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
new file mode 100644
index 0000000..83fc501
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+/**
+ * Interface for a cached copy of the State Store.
+ */
+public interface StateStoreCache {
+
+  /**
+   * Load the cache from the State Store. Called by the cache update service
+   * when the data has been reloaded.
+   *
+   * @param force If we force the load.
+   * @return If the cache was loaded successfully.
+   * @throws IOException If there was an error loading the cache.
+   */
+  boolean loadCache(boolean force) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
new file mode 100644
index 0000000..6a4dc76
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the {@link StateStoreService}
+ * cached information in the
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}.
+ * This is for performance and removes the State Store from the critical path
+ * in common operations.
+ */
+public class StateStoreCacheUpdateService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreCacheUpdateService.class);
+
+  /** The service that manages the State Store connection. */
+  private final StateStoreService stateStore;
+
+
+  /**
+   * Create a new Cache update service.
+   *
+   * @param stateStore Implementation of the state store
+   */
+  public StateStoreCacheUpdateService(StateStoreService stateStore) {
+    super(StateStoreCacheUpdateService.class.getSimpleName());
+    this.stateStore = stateStore;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    this.setIntervalMs(conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+        RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    LOG.debug("Updating State Store cache");
+    stateStore.refreshCaches();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
new file mode 100644
index 0000000..8ebf4b8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically monitor the connection of the StateStore
+ * {@link StateStoreService} data store and to re-open the connection
+ * to the data store if required.
+ */
+public class StateStoreConnectionMonitorService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreConnectionMonitorService.class);
+
+  /** Service that maintains the State Store connection. */
+  private final StateStoreService stateStore;
+
+
+  /**
+   * Create a new service to monitor the connectivity of the state store 
driver.
+   *
+   * @param store Instance of the state store to be monitored.
+   */
+  public StateStoreConnectionMonitorService(StateStoreService store) {
+    super(StateStoreConnectionMonitorService.class.getSimpleName());
+    this.stateStore = store;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.setIntervalMs(conf.getLong(
+        RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+        RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    LOG.debug("Checking state store connection");
+    if (!stateStore.isDriverReady()) {
+      LOG.info("Attempting to open state store driver.");
+      stateStore.loadDriver();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
new file mode 100644
index 0000000..ccbde09
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A service to initialize a
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver} and maintain the connection to the data store. There are
+ * multiple state store driver connections supported:
+ * <ul>
+ * <li>File
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
+ * StateStoreFileImpl StateStoreFileImpl}
+ * <li>ZooKeeper
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
+ * StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
+ * </ul>
+ * <p>
+ * The service also supports the dynamic registration of record stores like:
+ * <ul>
+ * <li>{@link MembershipStore}: state of the Namenodes in the
+ * federation.
+ * <li>{@link MountTableStore}: Mount table between to subclusters.
+ * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
+ * <li>{@link RebalancerStore}: Log of the rebalancing operations.
+ * <li>{@link RouterStore}: Router state in the federation.
+ * <li>{@link TokenStore}: Tokens in the federation.
+ * </ul>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class StateStoreService extends CompositeService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreService.class);
+
+
+  /** State Store configuration. */
+  private Configuration conf;
+
+  /** Identifier for the service. */
+  private String identifier;
+
+  /** Driver for the back end connection. */
+  private StateStoreDriver driver;
+
+  /** Service to maintain data store connection. */
+  private StateStoreConnectionMonitorService monitorService;
+
+  /** StateStore metrics. */
+  private StateStoreMetrics metrics;
+
+  /** Supported record stores. */
+  private final Map<
+      Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
+          recordStores;
+
+  /** Service to maintain State Store caches. */
+  private StateStoreCacheUpdateService cacheUpdater;
+  /** Time the cache was last successfully updated. */
+  private long cacheLastUpdateTime;
+  /** List of internal caches to update. */
+  private final List<StateStoreCache> cachesToUpdateInternal;
+  /** List of external caches to update. */
+  private final List<StateStoreCache> cachesToUpdateExternal;
+
+
+  public StateStoreService() {
+    super(StateStoreService.class.getName());
+
+    // Records and stores supported by this implementation
+    this.recordStores = new HashMap<>();
+
+    // Caches to maintain
+    this.cachesToUpdateInternal = new ArrayList<>();
+    this.cachesToUpdateExternal = new ArrayList<>();
+  }
+
+  /**
+   * Initialize the State Store and the connection to the backend.
+   *
+   * @param config Configuration for the State Store.
+   * @throws IOException
+   */
+  @Override
+  protected void serviceInit(Configuration config) throws Exception {
+    this.conf = config;
+
+    // Create implementation of State Store
+    Class<? extends StateStoreDriver> driverClass = this.conf.getClass(
+        RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+        RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
+        StateStoreDriver.class);
+    this.driver = ReflectionUtils.newInstance(driverClass, this.conf);
+
+    if (this.driver == null) {
+      throw new IOException("Cannot create driver for the State Store");
+    }
+
+    // Add supported record stores
+    addRecordStore(MembershipStoreImpl.class);
+    addRecordStore(MountTableStoreImpl.class);
+    addRecordStore(RouterStoreImpl.class);
+
+    // Check the connection to the State Store periodically
+    this.monitorService = new StateStoreConnectionMonitorService(this);
+    this.addService(monitorService);
+
+    // Set expirations intervals for each record
+    MembershipState.setExpirationMs(conf.getLong(
+        RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+        RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
+
+    RouterState.setExpirationMs(conf.getTimeDuration(
+        RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
+        RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
+    // Cache update service
+    this.cacheUpdater = new StateStoreCacheUpdateService(this);
+    addService(this.cacheUpdater);
+
+    // Create metrics for the State Store
+    this.metrics = StateStoreMetrics.create(conf);
+
+    // Adding JMX interface
+    try {
+      StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class);
+      ObjectName registeredObject =
+          MBeans.register("Router", "StateStore", bean);
+      LOG.info("Registered StateStoreMBean: {}", registeredObject);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad StateStoreMBean setup", e);
+    } catch (MetricsException e) {
+      LOG.info("Failed to register State Store bean {}", e.getMessage());
+    }
+
+    super.serviceInit(this.conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    loadDriver();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    closeDriver();
+
+    if (metrics != null) {
+      metrics.shutdown();
+      metrics = null;
+    }
+
+    super.serviceStop();
+  }
+
+  /**
+   * Add a record store to the State Store. It includes adding the store, the
+   * supported record and the cache management.
+   *
+   * @param clazz Class of the record store to track.
+   * @return New record store.
+   * @throws ReflectiveOperationException
+   */
+  private <T extends RecordStore<?>> void addRecordStore(
+      final Class<T> clazz) throws ReflectiveOperationException {
+
+    assert this.getServiceState() == STATE.INITED :
+        "Cannot add record to the State Store once started";
+
+    T recordStore = RecordStore.newInstance(clazz, this.getDriver());
+    Class<? extends BaseRecord> recordClass = recordStore.getRecordClass();
+    this.recordStores.put(recordClass, recordStore);
+
+    // Subscribe for cache updates
+    if (recordStore instanceof StateStoreCache) {
+      StateStoreCache cachedRecordStore = (StateStoreCache) recordStore;
+      this.cachesToUpdateInternal.add(cachedRecordStore);
+    }
+  }
+
+  /**
+   * Get the record store in this State Store for a given interface.
+   *
+   * @param recordStoreClass Class of the record store.
+   * @return Registered record store or null if not found.
+   */
+  public <T extends RecordStore<?>> T getRegisteredRecordStore(
+      final Class<T> recordStoreClass) {
+    for (RecordStore<? extends BaseRecord> recordStore :
+        this.recordStores.values()) {
+      if (recordStoreClass.isInstance(recordStore)) {
+        @SuppressWarnings("unchecked")
+        T recordStoreChecked = (T) recordStore;
+        return recordStoreChecked;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * List of records supported by this State Store.
+   *
+   * @return List of supported record classes.
+   */
+  public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
+    return this.recordStores.keySet();
+  }
+
+  /**
+   * Load the State Store driver. If successful, refresh cached data tables.
+   */
+  public void loadDriver() {
+    synchronized (this.driver) {
+      if (!isDriverReady()) {
+        String driverName = this.driver.getClass().getSimpleName();
+        if (this.driver.init(
+            conf, getIdentifier(), getSupportedRecords(), metrics)) {
+          LOG.info("Connection to the State Store driver {} is open and ready",
+              driverName);
+          this.refreshCaches();
+        } else {
+          LOG.error("Cannot initialize State Store driver {}", driverName);
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if the driver is ready to be used.
+   *
+   * @return If the driver is ready.
+   */
+  public boolean isDriverReady() {
+    return this.driver.isDriverReady();
+  }
+
+  /**
+   * Manually shuts down the driver.
+   *
+   * @throws Exception If the driver cannot be closed.
+   */
+  @VisibleForTesting
+  public void closeDriver() throws Exception {
+    if (this.driver != null) {
+      this.driver.close();
+    }
+  }
+
+  /**
+   * Get the state store driver.
+   *
+   * @return State store driver.
+   */
+  public StateStoreDriver getDriver() {
+    return this.driver;
+  }
+
+  /**
+   * Fetch a unique identifier for this state store instance. Typically it is
+   * the address of the router.
+   *
+   * @return Unique identifier for this store.
+   */
+  public String getIdentifier() {
+    return this.identifier;
+  }
+
+  /**
+   * Set a unique synchronization identifier for this store.
+   *
+   * @param id Unique identifier, typically the router's RPC address.
+   */
+  public void setIdentifier(String id) {
+    this.identifier = id;
+  }
+
+  //
+  // Cached state store data
+  //
+  /**
+   * The last time the state store cache was fully updated.
+   *
+   * @return Timestamp.
+   */
+  public long getCacheUpdateTime() {
+    return this.cacheLastUpdateTime;
+  }
+
+  /**
+   * Stops the cache update service.
+   */
+  @VisibleForTesting
+  public void stopCacheUpdateService() {
+    if (this.cacheUpdater != null) {
+      this.cacheUpdater.stop();
+      removeService(this.cacheUpdater);
+      this.cacheUpdater = null;
+    }
+  }
+
+  /**
+   * Register a cached record store for automatic periodic cache updates.
+   *
+   * @param client Client to the state store.
+   */
+  public void registerCacheExternal(StateStoreCache client) {
+    this.cachesToUpdateExternal.add(client);
+  }
+
+  /**
+   * Refresh the cache with information from the State Store. Called
+   * periodically by the CacheUpdateService to maintain data caches and
+   * versions.
+   */
+  public void refreshCaches() {
+    refreshCaches(false);
+  }
+
+  /**
+   * Refresh the cache with information from the State Store. Called
+   * periodically by the CacheUpdateService to maintain data caches and
+   * versions.
+   * @param force If we force the refresh.
+   */
+  public void refreshCaches(boolean force) {
+    boolean success = true;
+    if (isDriverReady()) {
+      List<StateStoreCache> cachesToUpdate = new LinkedList<>();
+      cachesToUpdate.addAll(cachesToUpdateInternal);
+      cachesToUpdate.addAll(cachesToUpdateExternal);
+      for (StateStoreCache cachedStore : cachesToUpdate) {
+        String cacheName = cachedStore.getClass().getSimpleName();
+        boolean result = false;
+        try {
+          result = cachedStore.loadCache(force);
+        } catch (IOException e) {
+          LOG.error("Error updating cache for {}", cacheName, e);
+          result = false;
+        }
+        if (!result) {
+          success = false;
+          LOG.error("Cache update failed for cache {}", cacheName);
+        }
+      }
+    } else {
+      success = false;
+      LOG.info("Skipping State Store cache update, driver is not ready.");
+    }
+    if (success) {
+      // Uses local time, not driver time.
+      this.cacheLastUpdateTime = Time.now();
+    }
+  }
+
+  /**
+   * Update the cache for a specific record store.
+   *
+   * @param clazz Class of the record store.
+   * @return If the cached was loaded.
+   * @throws IOException if the cache update failed.
+   */
+  public boolean loadCache(final Class<?> clazz) throws IOException {
+    return loadCache(clazz, false);
+  }
+
+  /**
+   * Update the cache for a specific record store.
+   *
+   * @param clazz Class of the record store.
+   * @param force Force the update ignoring cached periods.
+   * @return If the cached was loaded.
+   * @throws IOException if the cache update failed.
+   */
+  public boolean loadCache(Class<?> clazz, boolean force) throws IOException {
+    List<StateStoreCache> cachesToUpdate =
+        new LinkedList<StateStoreCache>();
+    cachesToUpdate.addAll(this.cachesToUpdateInternal);
+    cachesToUpdate.addAll(this.cachesToUpdateExternal);
+    for (StateStoreCache cachedStore : cachesToUpdate) {
+      if (clazz.isInstance(cachedStore)) {
+        return cachedStore.loadCache(force);
+      }
+    }
+    throw new IOException("Registered cache was not found for " + clazz);
+  }
+
+  /**
+   * Get the metrics for the State Store.
+   *
+   * @return State Store metrics.
+   */
+  public StateStoreMetrics getMetrics() {
+    return metrics;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java
new file mode 100644
index 0000000..4e6f8c8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+/**
+ * Thrown when the state store is not reachable or available. Cached APIs and
+ * queries may succeed. Client should retry again later.
+ */
+public class StateStoreUnavailableException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public StateStoreUnavailableException(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
new file mode 100644
index 0000000..0a36619
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Set of utility functions used to work with the State Store.
+ */
+public final class StateStoreUtils {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreUtils.class);
+
+
+  private StateStoreUtils() {
+    // Utility class
+  }
+
+  /**
+   * Get the base class for a record class. If we get an implementation of a
+   * record we will return the real parent record class.
+   *
+   * @param clazz Class of the data record to check.
+   * @return Base class for the record.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends BaseRecord>
+      Class<? extends BaseRecord> getRecordClass(final Class<T> clazz) {
+
+    // We ignore the Impl classes and go to the super class
+    Class<? extends BaseRecord> actualClazz = clazz;
+    while (actualClazz.getSimpleName().endsWith("Impl")) {
+      actualClazz = (Class<? extends BaseRecord>) actualClazz.getSuperclass();
+    }
+
+    // Check if we went too far
+    if (actualClazz.equals(BaseRecord.class)) {
+      LOG.error("We went too far ({}) with {}", actualClazz, clazz);
+      actualClazz = clazz;
+    }
+    return actualClazz;
+  }
+
+  /**
+   * Get the base class for a record. If we get an implementation of a record 
we
+   * will return the real parent record class.
+   *
+   * @param record Record to check its main class.
+   * @return Base class for the record.
+   */
+  public static <T extends BaseRecord>
+      Class<? extends BaseRecord> getRecordClass(final T record) {
+    return getRecordClass(record.getClass());
+  }
+
+  /**
+   * Get the base class name for a record. If we get an implementation of a
+   * record we will return the real parent record class.
+   *
+   * @param clazz Class of the data record to check.
+   * @return Name of the base class for the record.
+   */
+  public static <T extends BaseRecord> String getRecordName(
+      final Class<T> clazz) {
+    return getRecordClass(clazz).getSimpleName();
+  }
+
+  /**
+   * Filters a list of records to find all records matching the query.
+   *
+   * @param query Map of field names and objects to use to filter results.
+   * @param records List of data records to filter.
+   * @return List of all records matching the query (or empty list if none
+   *         match), null if the data set could not be filtered.
+   */
+  public static <T extends BaseRecord> List<T> filterMultiple(
+      final Query<T> query, final Iterable<T> records) {
+
+    List<T> matchingList = new ArrayList<>();
+    for (T record : records) {
+      if (query.matches(record)) {
+        matchingList.add(record);
+      }
+    }
+    return matchingList;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
new file mode 100644
index 0000000..d595a97
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.net.InetAddress;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Driver class for an implementation of a {@link StateStoreService}
+ * provider. Driver implementations will extend this class and implement some 
of
+ * the default methods.
+ */
+public abstract class StateStoreDriver implements StateStoreRecordOperations {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreDriver.class);
+
+
+  /** State Store configuration. */
+  private Configuration conf;
+
+  /** Identifier for the driver. */
+  private String identifier;
+
+  /** State Store metrics. */
+  private StateStoreMetrics metrics;
+
+
+  /**
+   * Initialize the state store connection.
+   *
+   * @param config Configuration for the driver.
+   * @param id Identifier for the driver.
+   * @param records Records that are supported.
+   * @return If initialized and ready, false if failed to initialize driver.
+   */
+  public boolean init(final Configuration config, final String id,
+      final Collection<Class<? extends BaseRecord>> records,
+      final StateStoreMetrics stateStoreMetrics) {
+
+    this.conf = config;
+    this.identifier = id;
+    this.metrics = stateStoreMetrics;
+
+    if (this.identifier == null) {
+      LOG.warn("The identifier for the State Store connection is not set");
+    }
+
+    boolean success = initDriver();
+    if (!success) {
+      LOG.error("Cannot initialize driver for {}", getDriverName());
+      return false;
+    }
+
+    for (Class<? extends BaseRecord> cls : records) {
+      String recordString = StateStoreUtils.getRecordName(cls);
+      if (!initRecordStorage(recordString, cls)) {
+        LOG.error("Cannot initialize record store for {}", 
cls.getSimpleName());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the State Store configuration.
+   *
+   * @return Configuration for the State Store.
+   */
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Gets a unique identifier for the running task/process. Typically the
+   * router address.
+   *
+   * @return Unique identifier for the running task.
+   */
+  public String getIdentifier() {
+    return this.identifier;
+  }
+
+  /**
+   * Get the metrics for the State Store.
+   *
+   * @return State Store metrics.
+   */
+  public StateStoreMetrics getMetrics() {
+    return this.metrics;
+  }
+
+  /**
+   * Prepare the driver to access data storage.
+   *
+   * @return True if the driver was successfully initialized. If false is
+   *         returned, the state store will periodically attempt to
+   *         re-initialize the driver and the router will remain in safe mode
+   *         until the driver is initialized.
+   */
+  public abstract boolean initDriver();
+
+  /**
+   * Initialize storage for a single record class.
+   *
+   * @param className String reference of the record class to initialize,
+   *                  used to construct paths and file names for the record.
+   *                  Determined by configuration settings for the specific
+   *                  driver.
+   * @param clazz Record type corresponding to the provided name.
+   * @return True if successful, false otherwise.
+   */
+  public abstract <T extends BaseRecord> boolean initRecordStorage(
+      String className, Class<T> clazz);
+
+  /**
+   * Check if the driver is currently running and the data store connection is
+   * valid.
+   *
+   * @return True if the driver is initialized and the data store is ready.
+   */
+  public abstract boolean isDriverReady();
+
+  /**
+   * Check if the driver is ready to be used and throw an exception otherwise.
+   *
+   * @throws StateStoreUnavailableException If the driver is not ready.
+   */
+  public void verifyDriverReady() throws StateStoreUnavailableException {
+    if (!isDriverReady()) {
+      String driverName = getDriverName();
+      String hostname = getHostname();
+      throw new StateStoreUnavailableException("State Store driver " +
+          driverName + " in " + hostname + " is not ready.");
+    }
+  }
+
+  /**
+   * Close the State Store driver connection.
+   */
+  public abstract void close() throws Exception;
+
+  /**
+   * Returns the current time synchronization from the underlying store.
+   * Override for stores that supply a current date. The data store driver is
+   * responsible for maintaining the official synchronization time/date for all
+   * distributed components.
+   *
+   * @return Current time stamp, used for all synchronization dates.
+   */
+  public long getTime() {
+    return Time.now();
+  }
+
+  /**
+   * Get the name of the driver implementation for debugging.
+   *
+   * @return Name of the driver implementation.
+   */
+  private String getDriverName() {
+    return this.getClass().getSimpleName();
+  }
+
+  /**
+   * Get the host name of the machine running the driver for debugging.
+   *
+   * @return Host name of the machine running the driver.
+   */
+  private String getHostname() {
+    String hostname = "Unknown";
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+    } catch (Exception e) {
+      LOG.error("Cannot get local address", e);
+    }
+    return hostname;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
new file mode 100644
index 0000000..443d46e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
+
+/**
+ * Operations for a driver to manage records in the State Store.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface StateStoreRecordOperations {
+
+  /**
+   * Get all records of the requested record class from the data store. To use
+   * the default implementations in this class, getAll must return new 
instances
+   * of the records on each call. It is recommended to override the default
+   * implementations for better performance.
+   *
+   * @param clazz Class of record to fetch.
+   * @return List of all records that match the clazz.
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @Idempotent
+  <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException;
+
+  /**
+   * Get a single record from the store that matches the query.
+   *
+   * @param clazz Class of record to fetch.
+   * @param query Query to filter results.
+   * @return A single record matching the query. Null if there are no matching
+   *         records or more than one matching record in the store.
+   * @throws IOException If multiple records match or if the data store cannot
+   *           be queried.
+   */
+  @Idempotent
+  <T extends BaseRecord> T get(Class<T> clazz, Query<T> query)
+      throws IOException;
+
+  /**
+   * Get multiple records from the store that match a query. This method
+   * assumes the underlying driver does not support filtering. If the driver
+   * supports filtering it should overwrite this method.
+   *
+   * @param clazz Class of record to fetch.
+   * @param query Query to filter results.
+   * @return Records of type clazz that match the query or empty list if none
+   *         are found.
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @Idempotent
+  <T extends BaseRecord> List<T> getMultiple(
+      Class<T> clazz, Query<T> query) throws IOException;
+
+  /**
+   * Creates a single record. Optionally updates an existing record with same
+   * primary key.
+   *
+   * @param record The record to insert or update.
+   * @param allowUpdate True if update of exiting record is allowed.
+   * @param errorIfExists True if an error should be returned when inserting
+   *          an existing record. Only used if allowUpdate = false.
+   * @return True if the operation was successful.
+   *
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @AtMostOnce
+  <T extends BaseRecord> boolean put(
+       T record, boolean allowUpdate, boolean errorIfExists) throws 
IOException;
+
+  /**
+   * Creates multiple records. Optionally updates existing records that have
+   * the same primary key.
+   *
+   * @param records List of data records to update or create. All records must
+   *                be of class clazz.
+   * @param clazz Record class of records.
+   * @param allowUpdate True if update of exiting record is allowed.
+   * @param errorIfExists True if an error should be returned when inserting
+   *          an existing record. Only used if allowUpdate = false.
+   * @return true if all operations were successful.
+   *
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @AtMostOnce
+  <T extends BaseRecord> boolean putAll(
+      List<T> records, boolean allowUpdate, boolean errorIfExists)
+          throws IOException;
+
+  /**
+   * Remove a single record.
+   *
+   * @param record Record to be removed.
+   * @return true If the record was successfully removed. False if the record
+   *              could not be removed or not stored.
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @AtMostOnce
+  <T extends BaseRecord> boolean remove(T record) throws IOException;
+
+  /**
+   * Remove all records of this class from the store.
+   *
+   * @param clazz Class of records to remove.
+   * @return True if successful.
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @AtMostOnce
+  <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException;
+
+  /**
+   * Remove multiple records of a specific class that match a query. Requires
+   * the getAll implementation to fetch fresh records on each call.
+   *
+   * @param query Query to filter what to remove.
+   * @return The number of records removed.
+   * @throws IOException Throws exception if unable to query the data store.
+   */
+  @AtMostOnce
+  <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
+      throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
new file mode 100644
index 0000000..666712f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Serializer to store and retrieve data in the State Store.
+ */
+public abstract class StateStoreSerializer {
+
+  /** Singleton for the serializer instance. */
+  private static StateStoreSerializer defaultSerializer;
+
+  /**
+   * Get the default serializer based.
+   * @return Singleton serializer.
+   */
+  public static StateStoreSerializer getSerializer() {
+    return getSerializer(null);
+  }
+
+  /**
+   * Get a serializer based on the provided configuration.
+   * @param conf Configuration. Default if null.
+   * @return Singleton serializer.
+   */
+  public static StateStoreSerializer getSerializer(Configuration conf) {
+    if (conf == null) {
+      synchronized (StateStoreSerializer.class) {
+        if (defaultSerializer == null) {
+          conf = new Configuration();
+          defaultSerializer = newSerializer(conf);
+        }
+      }
+      return defaultSerializer;
+    } else {
+      return newSerializer(conf);
+    }
+  }
+
+  private static StateStoreSerializer newSerializer(final Configuration conf) {
+    Class<? extends StateStoreSerializer> serializerName = conf.getClass(
+        RBFConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS,
+        RBFConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT,
+        StateStoreSerializer.class);
+    return ReflectionUtils.newInstance(serializerName, conf);
+  }
+
+  /**
+   * Create a new record.
+   * @param clazz Class of the new record.
+   * @return New record.
+   */
+  public static <T> T newRecord(Class<T> clazz) {
+    return getSerializer(null).newRecordInstance(clazz);
+  }
+
+  /**
+   * Create a new record.
+   * @param clazz Class of the new record.
+   * @return New record.
+   */
+  public abstract <T> T newRecordInstance(Class<T> clazz);
+
+  /**
+   * Serialize a record into a byte array.
+   * @param record Record to serialize.
+   * @return Byte array with the serialized record.
+   */
+  public abstract byte[] serialize(BaseRecord record);
+
+  /**
+   * Serialize a record into a string.
+   * @param record Record to serialize.
+   * @return String with the serialized record.
+   */
+  public abstract String serializeString(BaseRecord record);
+
+  /**
+   * Deserialize a bytes array into a record.
+   * @param byteArray Byte array to deserialize.
+   * @param clazz Class of the record.
+   * @return New record.
+   * @throws IOException If it cannot deserialize the record.
+   */
+  public abstract <T extends BaseRecord> T deserialize(
+      byte[] byteArray, Class<T> clazz) throws IOException;
+
+  /**
+   * Deserialize a string into a record.
+   * @param data String with the data to deserialize.
+   * @param clazz Class of the record.
+   * @return New record.
+   * @throws IOException If it cannot deserialize the record.
+   */
+  public abstract <T extends BaseRecord> T deserialize(
+      String data, Class<T> clazz) throws IOException;
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to