http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
new file mode 100644
index 0000000..1bd35f2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+
+/**
+ * Base implementation of a State Store driver. It contains default
+ * implementations for the optional functions. These implementations use an
+ * uncached read/write all algorithm for all changes. In most cases it is
+ * recommended to override the optional functions.
+ * <p>
+ * Drivers may optionally override additional routines for performance
+ * optimization, such as custom get/put/remove queries, depending on the
+ * capabilities of the data store.
+ * <p>
+ */
+public abstract class StateStoreBaseImpl extends StateStoreDriver {
+
+  @Override
+  public <T extends BaseRecord> T get(
+      Class<T> clazz, Query<T> query) throws IOException {
+    List<T> records = getMultiple(clazz, query);
+    if (records.size() > 1) {
+      throw new IOException("Found more than one object in collection");
+    } else if (records.size() == 1) {
+      return records.get(0);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public <T extends BaseRecord> List<T> getMultiple(
+      Class<T> clazz, Query<T> query) throws IOException  {
+    QueryResult<T> result = get(clazz);
+    List<T> records = result.getRecords();
+    List<T> ret = filterMultiple(query, records);
+    if (ret == null) {
+      throw new IOException("Cannot fetch records from the store");
+    }
+    return ret;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean put(
+      T record, boolean allowUpdate, boolean errorIfExists) throws IOException 
{
+    List<T> singletonList = new ArrayList<>();
+    singletonList.add(record);
+    return putAll(singletonList, allowUpdate, errorIfExists);
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean remove(T record) throws IOException {
+    final Query<T> query = new Query<T>(record);
+    Class<? extends BaseRecord> clazz = record.getClass();
+    @SuppressWarnings("unchecked")
+    Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
+    return remove(recordClass, query) == 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
new file mode 100644
index 0000000..6638d1c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.apache.hadoop.util.Time.now;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link StateStoreDriver} implementation based on files. In this approach, we
+ * use temporary files for the writes and renaming "atomically" to the final
+ * value. Instead of writing to the final location, it will go to a temporary
+ * one and then rename to the final destination.
+ */
+public abstract class StateStoreFileBaseImpl
+    extends StateStoreSerializableImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
+
+  /** File extension for temporary files. */
+  private static final String TMP_MARK = ".tmp";
+  /** We remove temporary files older than 10 seconds. */
+  private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
+  /** File pattern for temporary records: file.XYZ.tmp. */
+  private static final Pattern OLD_TMP_RECORD_PATTERN =
+      Pattern.compile(".+\\.(\\d+)\\.tmp");
+
+  /** If it is initialized. */
+  private boolean initialized = false;
+
+
+  /**
+   * Get the reader of a record for the file system.
+   *
+   * @param path Path of the record to read.
+   * @return Reader for the record.
+   */
+  protected abstract <T extends BaseRecord> BufferedReader getReader(
+      String path);
+
+  /**
+   * Get the writer of a record for the file system.
+   *
+   * @param path Path of the record to write.
+   * @return Writer for the record.
+   */
+  protected abstract <T extends BaseRecord> BufferedWriter getWriter(
+      String path);
+
+  /**
+   * Check if a path exists.
+   *
+   * @param path Path to check.
+   * @return If the path exists.
+   */
+  protected abstract boolean exists(String path);
+
+  /**
+   * Make a directory.
+   *
+   * @param path Path of the directory to create.
+   * @return If the directory was created.
+   */
+  protected abstract boolean mkdir(String path);
+
+  /**
+   * Rename a file. This should be atomic.
+   *
+   * @param src Source name.
+   * @param dst Destination name.
+   * @return If the rename was successful.
+   */
+  protected abstract boolean rename(String src, String dst);
+
+  /**
+   * Remove a file.
+   *
+   * @param path Path for the file to remove
+   * @return If the file was removed.
+   */
+  protected abstract boolean remove(String path);
+
+  /**
+   * Get the children for a path.
+   *
+   * @param path Path to check.
+   * @return List of children.
+   */
+  protected abstract List<String> getChildren(String path);
+
+  /**
+   * Get root directory.
+   *
+   * @return Root directory.
+   */
+  protected abstract String getRootDir();
+
+  /**
+   * Set the driver as initialized.
+   *
+   * @param ini If the driver is initialized.
+   */
+  public void setInitialized(boolean ini) {
+    this.initialized = ini;
+  }
+
+  @Override
+  public boolean initDriver() {
+    String rootDir = getRootDir();
+    try {
+      if (rootDir == null) {
+        LOG.error("Invalid root directory, unable to initialize driver.");
+        return false;
+      }
+
+      // Check root path
+      if (!exists(rootDir)) {
+        if (!mkdir(rootDir)) {
+          LOG.error("Cannot create State Store root directory {}", rootDir);
+          return false;
+        }
+      }
+    } catch (Exception ex) {
+      LOG.error(
+          "Cannot initialize filesystem using root directory {}", rootDir, ex);
+      return false;
+    }
+    setInitialized(true);
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(
+      String className, Class<T> recordClass) {
+
+    String dataDirPath = getRootDir() + "/" + className;
+    try {
+      // Create data directories for files
+      if (!exists(dataDirPath)) {
+        LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
+        if (!mkdir(dataDirPath)) {
+          LOG.error("Cannot create data directory {}", dataDirPath);
+          return false;
+        }
+      }
+    } catch (Exception ex) {
+      LOG.error("Cannot create data directory {}", dataDirPath, ex);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+      throws IOException {
+    verifyDriverReady();
+    long start = monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+    List<T> ret = new ArrayList<>();
+    try {
+      String path = getPathForClass(clazz);
+      List<String> children = getChildren(path);
+      for (String child : children) {
+        String pathRecord = path + "/" + child;
+        if (child.endsWith(TMP_MARK)) {
+          LOG.debug("There is a temporary file {} in {}", child, path);
+          if (isOldTempRecord(child)) {
+            LOG.warn("Removing {} as it's an old temporary record", child);
+            remove(pathRecord);
+          }
+        } else {
+          T record = getRecord(pathRecord, clazz);
+          ret.add(record);
+        }
+      }
+    } catch (Exception e) {
+      if (metrics != null) {
+        metrics.addFailure(monotonicNow() - start);
+      }
+      String msg = "Cannot fetch records for " + clazz.getSimpleName();
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (metrics != null) {
+      metrics.addRead(monotonicNow() - start);
+    }
+    return new QueryResult<T>(ret, getTime());
+  }
+
+  /**
+   * Check if a record is temporary and old.
+   *
+   * @param pathRecord Path for the record to check.
+   * @return If the record is temporary and old.
+   */
+  @VisibleForTesting
+  public static boolean isOldTempRecord(final String pathRecord) {
+    if (!pathRecord.endsWith(TMP_MARK)) {
+      return false;
+    }
+    // Extract temporary record creation time
+    Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
+    if (m.find()) {
+      long time = Long.parseLong(m.group(1));
+      return now() - time > OLD_TMP_RECORD_MS;
+    }
+    return false;
+  }
+
+  /**
+   * Read a record from a file.
+   *
+   * @param path Path to the file containing the record.
+   * @param clazz Class of the record.
+   * @return Record read from the file.
+   * @throws IOException If the file cannot be read.
+   */
+  private <T extends BaseRecord> T getRecord(
+      final String path, final Class<T> clazz) throws IOException {
+    BufferedReader reader = getReader(path);
+    try {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        if (!line.startsWith("#") && line.length() > 0) {
+          try {
+            T record = newRecord(line, clazz, false);
+            return record;
+          } catch (Exception ex) {
+            LOG.error("Cannot parse line {} in file {}", line, path, ex);
+          }
+        }
+      }
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+    throw new IOException("Cannot read " + path + " for record " +
+        clazz.getSimpleName());
+  }
+
+  /**
+   * Get the path for a record class.
+   * @param clazz Class of the record.
+   * @return Path for this record class.
+   */
+  private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
+    String className = StateStoreUtils.getRecordName(clazz);
+    StringBuilder sb = new StringBuilder();
+    sb.append(getRootDir());
+    if (sb.charAt(sb.length() - 1) != '/') {
+      sb.append("/");
+    }
+    sb.append(className);
+    return sb.toString();
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    return this.initialized;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(
+      List<T> records, boolean allowUpdate, boolean errorIfExists)
+          throws StateStoreUnavailableException {
+    verifyDriverReady();
+    if (records.isEmpty()) {
+      return true;
+    }
+
+    long start = monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+
+    // Check if any record exists
+    Map<String, T> toWrite = new HashMap<>();
+    for (T record : records) {
+      Class<? extends BaseRecord> recordClass = record.getClass();
+      String path = getPathForClass(recordClass);
+      String primaryKey = getPrimaryKey(record);
+      String recordPath = path + "/" + primaryKey;
+
+      if (exists(recordPath)) {
+        if (allowUpdate) {
+          // Update the mod time stamp. Many backends will use their
+          // own timestamp for the mod time.
+          record.setDateModified(this.getTime());
+          toWrite.put(recordPath, record);
+        } else if (errorIfExists) {
+          LOG.error("Attempt to insert record {} that already exists",
+              recordPath);
+          if (metrics != null) {
+            metrics.addFailure(monotonicNow() - start);
+          }
+          return false;
+        } else  {
+          LOG.debug("Not updating {}", record);
+        }
+      } else {
+        toWrite.put(recordPath, record);
+      }
+    }
+
+    // Write the records
+    boolean success = true;
+    for (Entry<String, T> entry : toWrite.entrySet()) {
+      String recordPath = entry.getKey();
+      String recordPathTemp = recordPath + "." + now() + TMP_MARK;
+      BufferedWriter writer = getWriter(recordPathTemp);
+      try {
+        T record = entry.getValue();
+        String line = serializeString(record);
+        writer.write(line);
+      } catch (IOException e) {
+        LOG.error("Cannot write {}", recordPathTemp, e);
+        success = false;
+      } finally {
+        if (writer != null) {
+          try {
+            writer.close();
+          } catch (IOException e) {
+            LOG.error("Cannot close the writer for {}", recordPathTemp);
+          }
+        }
+      }
+      // Commit
+      if (!rename(recordPathTemp, recordPath)) {
+        LOG.error("Failed committing record into {}", recordPath);
+        success = false;
+      }
+    }
+
+    long end = monotonicNow();
+    if (metrics != null) {
+      if (success) {
+        metrics.addWrite(end - start);
+      } else {
+        metrics.addFailure(end - start);
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
+      throws StateStoreUnavailableException {
+    verifyDriverReady();
+
+    if (query == null) {
+      return 0;
+    }
+
+    long start = Time.monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+    int removed = 0;
+    // Get the current records
+    try {
+      final QueryResult<T> result = get(clazz);
+      final List<T> existingRecords = result.getRecords();
+      // Write all of the existing records except those to be removed
+      final List<T> recordsToRemove = filterMultiple(query, existingRecords);
+      boolean success = true;
+      for (T recordToRemove : recordsToRemove) {
+        String path = getPathForClass(clazz);
+        String primaryKey = getPrimaryKey(recordToRemove);
+        String recordToRemovePath = path + "/" + primaryKey;
+        if (remove(recordToRemovePath)) {
+          removed++;
+        } else {
+          LOG.error("Cannot remove record {}", recordToRemovePath);
+          success = false;
+        }
+      }
+      if (!success) {
+        LOG.error("Cannot remove records {} query {}", clazz, query);
+        if (metrics != null) {
+          metrics.addFailure(monotonicNow() - start);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot remove records {} query {}", clazz, query, e);
+      if (metrics != null) {
+        metrics.addFailure(monotonicNow() - start);
+      }
+    }
+
+    if (removed > 0 && metrics != null) {
+      metrics.addRemove(monotonicNow() - start);
+    }
+    return removed;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
+      throws StateStoreUnavailableException {
+    verifyDriverReady();
+    long start = Time.monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+
+    boolean success = true;
+    String path = getPathForClass(clazz);
+    List<String> children = getChildren(path);
+    for (String child : children) {
+      String pathRecord = path + "/" + child;
+      if (!remove(pathRecord)) {
+        success = false;
+      }
+    }
+
+    if (metrics != null) {
+      long time = Time.monotonicNow() - start;
+      if (success) {
+        metrics.addRemove(time);
+      } else {
+        metrics.addFailure(time);
+      }
+    }
+    return success;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
new file mode 100644
index 0000000..7d9ddc6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+/**
+ * StateStoreDriver implementation based on a local file.
+ */
+public class StateStoreFileImpl extends StateStoreFileBaseImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreFileImpl.class);
+
+  /** Configuration keys. */
+  public static final String FEDERATION_STORE_FILE_DIRECTORY =
+      RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
+
+  /** Root directory for the state store. */
+  private String rootDirectory;
+
+
+  @Override
+  protected boolean exists(String path) {
+    File test = new File(path);
+    return test.exists();
+  }
+
+  @Override
+  protected boolean mkdir(String path) {
+    File dir = new File(path);
+    return dir.mkdirs();
+  }
+
+  @Override
+  protected boolean rename(String src, String dst) {
+    try {
+      Files.move(new File(src), new File(dst));
+      return true;
+    } catch (IOException e) {
+      LOG.error("Cannot rename {} to {}", src, dst, e);
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean remove(String path) {
+    File file = new File(path);
+    return file.delete();
+  }
+
+  @Override
+  protected String getRootDir() {
+    if (this.rootDirectory == null) {
+      String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
+      if (dir == null) {
+        File tempDir = Files.createTempDir();
+        dir = tempDir.getAbsolutePath();
+        LOG.warn("The root directory is not available, using {}", dir);
+      }
+      this.rootDirectory = dir;
+    }
+    return this.rootDirectory;
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedReader getReader(String filename) {
+    BufferedReader reader = null;
+    try {
+      LOG.debug("Loading file: {}", filename);
+      File file = new File(filename);
+      FileInputStream fis = new FileInputStream(file);
+      InputStreamReader isr =
+          new InputStreamReader(fis, StandardCharsets.UTF_8);
+      reader = new BufferedReader(isr);
+    } catch (Exception ex) {
+      LOG.error("Cannot open read stream for record {}", filename, ex);
+    }
+    return reader;
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
+    BufferedWriter writer = null;
+    try {
+      LOG.debug("Writing file: {}", filename);
+      File file = new File(filename);
+      FileOutputStream fos = new FileOutputStream(file, false);
+      OutputStreamWriter osw =
+          new OutputStreamWriter(fos, StandardCharsets.UTF_8);
+      writer = new BufferedWriter(osw);
+    } catch (IOException e) {
+      LOG.error("Cannot open write stream for record {}", filename, e);
+    }
+    return writer;
+  }
+
+  @Override
+  public void close() throws Exception {
+    setInitialized(false);
+  }
+
+  @Override
+  protected List<String> getChildren(String path) {
+    List<String> ret = new LinkedList<>();
+    File dir = new File(path);
+    File[] files = dir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        String filename = file.getName();
+        ret.add(filename);
+      }
+    }
+    return ret;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
new file mode 100644
index 0000000..ad822fb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StateStoreDriver} implementation based on a filesystem. The most common uses
+ * HDFS as a backend.
+ */
+public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
+
+
+  /** Configuration keys. */
+  public static final String FEDERATION_STORE_FS_PATH =
+      RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
+
+  /** File system to back the State Store. */
+  private FileSystem fs;
+  /** Working path in the filesystem. */
+  private String workPath;
+
+  @Override
+  protected boolean exists(String path) {
+    try {
+      return fs.exists(new Path(path));
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean mkdir(String path) {
+    try {
+      return fs.mkdirs(new Path(path));
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean rename(String src, String dst) {
+    try {
+      if (fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
+        return true;
+      } else {
+        // Replace should be atomic but not available
+        if (fs.exists(new Path(dst))) {
+          fs.delete(new Path(dst), true);
+        }
+        return fs.rename(new Path(src), new Path(dst));
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot rename {} to {}", src, dst, e);
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean remove(String path) {
+    try {
+      return fs.delete(new Path(path), true);
+    } catch (Exception e) {
+      LOG.error("Cannot remove {}", path, e);
+      return false;
+    }
+  }
+
+  @Override
+  protected String getRootDir() {
+    if (this.workPath == null) {
+      String rootPath = getConf().get(FEDERATION_STORE_FS_PATH);
+      URI workUri;
+      try {
+        workUri = new URI(rootPath);
+        fs = FileSystem.get(workUri, getConf());
+      } catch (Exception ex) {
+        return null;
+      }
+      this.workPath = rootPath;
+    }
+    return this.workPath;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
+    BufferedReader reader = null;
+    Path path = new Path(pathName);
+    try {
+      FSDataInputStream fdis = fs.open(path);
+      InputStreamReader isr =
+          new InputStreamReader(fdis, StandardCharsets.UTF_8);
+      reader = new BufferedReader(isr);
+    } catch (IOException ex) {
+      LOG.error("Cannot open read stream for {}", path);
+    }
+    return reader;
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
+    BufferedWriter writer = null;
+    Path path = new Path(pathName);
+    try {
+      FSDataOutputStream fdos = fs.create(path, true);
+      OutputStreamWriter osw =
+          new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
+      writer = new BufferedWriter(osw);
+    } catch (IOException ex) {
+      LOG.error("Cannot open write stream for {}", path);
+    }
+    return writer;
+  }
+
+  @Override
+  protected List<String> getChildren(String pathName) {
+    List<String> ret = new LinkedList<>();
+    Path path = new Path(workPath, pathName);
+    try {
+      FileStatus[] files = fs.listStatus(path);
+      for (FileStatus file : files) {
+        Path filePath = file.getPath();
+        String fileName = filePath.getName();
+        ret.add(fileName);
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot get children for {}", pathName, e);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
new file mode 100644
index 0000000..7bc93de
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+
+/**
+ * State Store driver that stores a serialization of the records. The 
serializer
+ * is pluggable.
+ */
+public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
+
+  /** Mark for slashes in path names. */
+  protected static final String SLASH_MARK = "0SLASH0";
+  /** Mark for colon in path names. */
+  protected static final String COLON_MARK = "_";
+
+  /** Default serializer for this driver. */
+  private StateStoreSerializer serializer;
+
+
+  @Override
+  public boolean init(final Configuration config, final String id,
+      final Collection<Class<? extends BaseRecord>> records,
+      final StateStoreMetrics metrics) {
+    boolean ret = super.init(config, id, records, metrics);
+
+    this.serializer = StateStoreSerializer.getSerializer(config);
+
+    return ret;
+  }
+
+  /**
+   * Serialize a record using the serializer.
+   * @param record Record to serialize.
+   * @return Byte array with the serialization of the record.
+   */
+  protected <T extends BaseRecord> byte[] serialize(T record) {
+    return serializer.serialize(record);
+  }
+
+  /**
+   * Serialize a record using the serializer.
+   * @param record Record to serialize.
+   * @return String with the serialization of the record.
+   */
+  protected <T extends BaseRecord> String serializeString(T record) {
+    return serializer.serializeString(record);
+  }
+
+  /**
+   * Creates a record from an input data string.
+   * @param data Serialized text of the record.
+   * @param clazz Record class.
+   * @param includeDates If dateModified and dateCreated are serialized.
+   * @return The created record.
+   * @throws IOException
+   */
+  protected <T extends BaseRecord> T newRecord(
+      String data, Class<T> clazz, boolean includeDates) throws IOException {
+    return serializer.deserialize(data, clazz);
+  }
+
+  /**
+   * Get the primary key for a record. If we don't want to store in folders, we
+   * need to remove / from the name.
+   *
+   * @param record Record to get the primary key for.
+   * @return Primary key for the record.
+   */
+  protected static String getPrimaryKey(BaseRecord record) {
+    String primaryKey = record.getPrimaryKey();
+    primaryKey = primaryKey.replaceAll("/", SLASH_MARK);
+    primaryKey = primaryKey.replaceAll(":", COLON_MARK);
+    return primaryKey;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
new file mode 100644
index 0000000..45c5dd6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the State Store serializer.
+ */
+public final class StateStoreSerializerPBImpl extends StateStoreSerializer {
+
+  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
+  private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
+
+  private Configuration localConf = new Configuration();
+
+
+  private StateStoreSerializerPBImpl() {
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> T newRecordInstance(Class<T> clazz) {
+    try {
+      String clazzPBImpl = getPBImplClassName(clazz);
+      Class<?> pbClazz = localConf.getClassByName(clazzPBImpl);
+      Object retObject = ReflectionUtils.newInstance(pbClazz, localConf);
+      return (T)retObject;
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String getPBImplClassName(Class<?> clazz) {
+    String srcPackagePart = getPackageName(clazz);
+    String srcClassName = getClassName(clazz);
+    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
+    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
+    return destPackagePart + "." + destClassPart;
+  }
+
+  private String getClassName(Class<?> clazz) {
+    String fqName = clazz.getName();
+    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
+  }
+
+  private String getPackageName(Class<?> clazz) {
+    return clazz.getPackage().getName();
+  }
+
+  @Override
+  public byte[] serialize(BaseRecord record) {
+    byte[] byteArray64 = null;
+    if (record instanceof PBRecord) {
+      PBRecord recordPB = (PBRecord) record;
+      Message msg = recordPB.getProto();
+      byte[] byteArray = msg.toByteArray();
+      byteArray64 = Base64.encodeBase64(byteArray, false);
+    }
+    return byteArray64;
+  }
+
+  @Override
+  public String serializeString(BaseRecord record) {
+    byte[] byteArray64 = serialize(record);
+    String base64Encoded = StringUtils.newStringUtf8(byteArray64);
+    return base64Encoded;
+  }
+
+  @Override
+  public <T extends BaseRecord> T deserialize(
+      byte[] byteArray, Class<T> clazz) throws IOException {
+
+    T record = newRecord(clazz);
+    if (record instanceof PBRecord) {
+      PBRecord pbRecord = (PBRecord) record;
+      byte[] byteArray64 = Base64.encodeBase64(byteArray, false);
+      String base64Encoded = StringUtils.newStringUtf8(byteArray64);
+      pbRecord.readInstance(base64Encoded);
+    }
+    return record;
+  }
+
+  @Override
+  public <T extends BaseRecord> T deserialize(String data, Class<T> clazz)
+      throws IOException {
+    byte[] byteArray64 = Base64.decodeBase64(data);
+    return deserialize(byteArray64, clazz);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
new file mode 100644
index 0000000..cd5372d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
+import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
+ * backend.
+ * <p>
+ * The structure of the znodes in the ensemble is:
+ * PARENT_PATH
+ * |--- MOUNT
+ * |--- MEMBERSHIP
+ * |--- REBALANCER
+ * |--- ROUTERS
+ */
+public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
+
+
+  /** Configuration keys. */
+  public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
+      RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
+      "/hdfs-federation";
+
+
+  /** Directory to store the state store data. */
+  private String baseZNode;
+
+  /** Interface to ZooKeeper. */
+  private ZKCuratorManager zkManager;
+
+
+  @Override
+  public boolean initDriver() {
+    LOG.info("Initializing ZooKeeper connection");
+
+    Configuration conf = getConf();
+    baseZNode = conf.get(
+        FEDERATION_STORE_ZK_PARENT_PATH,
+        FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+    try {
+      this.zkManager = new ZKCuratorManager(conf);
+      this.zkManager.start();
+    } catch (IOException e) {
+      LOG.error("Cannot initialize the ZK connection", e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(
+      String className, Class<T> clazz) {
+    try {
+      String checkPath = getNodePath(baseZNode, className);
+      zkManager.createRootDirRecursively(checkPath);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Cannot initialize ZK node for {}: {}",
+          className, e.getMessage());
+      return false;
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (zkManager  != null) {
+      zkManager.close();
+    }
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    if (zkManager == null) {
+      return false;
+    }
+    CuratorFramework curator = zkManager.getCurator();
+    if (curator == null) {
+      return false;
+    }
+    return curator.getState() == CuratorFrameworkState.STARTED;
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+      throws IOException {
+    verifyDriverReady();
+    long start = monotonicNow();
+    List<T> ret = new ArrayList<>();
+    String znode = getZNodeForClass(clazz);
+    try {
+      List<String> children = zkManager.getChildren(znode);
+      for (String child : children) {
+        try {
+          String path = getNodePath(znode, child);
+          Stat stat = new Stat();
+          String data = zkManager.getStringData(path, stat);
+          boolean corrupted = false;
+          if (data == null || data.equals("")) {
+            // All records should have data, otherwise this is corrupted
+            corrupted = true;
+          } else {
+            try {
+              T record = createRecord(data, stat, clazz);
+              ret.add(record);
+            } catch (IOException e) {
+              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
+                  clazz.getSimpleName(), data, e.getMessage());
+              corrupted = true;
+            }
+          }
+
+          if (corrupted) {
+            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
+                child, path);
+            zkManager.delete(path);
+          }
+        } catch (Exception e) {
+          LOG.error("Cannot get data for {}: {}", child, e.getMessage());
+        }
+      }
+    } catch (Exception e) {
+      getMetrics().addFailure(monotonicNow() - start);
+      String msg = "Cannot get children for \"" + znode + "\": " +
+          e.getMessage();
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+    long end = monotonicNow();
+    getMetrics().addRead(end - start);
+    return new QueryResult<T>(ret, getTime());
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(
+      List<T> records, boolean update, boolean error) throws IOException {
+    verifyDriverReady();
+    if (records.isEmpty()) {
+      return true;
+    }
+
+    // All records should be the same
+    T record0 = records.get(0);
+    Class<? extends BaseRecord> recordClass = record0.getClass();
+    String znode = getZNodeForClass(recordClass);
+
+    long start = monotonicNow();
+    boolean status = true;
+    for (T record : records) {
+      String primaryKey = getPrimaryKey(record);
+      String recordZNode = getNodePath(znode, primaryKey);
+      byte[] data = serialize(record);
+      if (!writeNode(recordZNode, data, update, error)){
+        status = false;
+      }
+    }
+    long end = monotonicNow();
+    if (status) {
+      getMetrics().addWrite(end - start);
+    } else {
+      getMetrics().addFailure(end - start);
+    }
+    return status;
+  }
+
+  @Override
+  public <T extends BaseRecord> int remove(
+      Class<T> clazz, Query<T> query) throws IOException {
+    verifyDriverReady();
+    if (query == null) {
+      return 0;
+    }
+
+    // Read the current data
+    long start = monotonicNow();
+    List<T> records = null;
+    try {
+      QueryResult<T> result = get(clazz);
+      records = result.getRecords();
+    } catch (IOException ex) {
+      LOG.error("Cannot get existing records", ex);
+      getMetrics().addFailure(monotonicNow() - start);
+      return 0;
+    }
+
+    // Check the records to remove
+    String znode = getZNodeForClass(clazz);
+    List<T> recordsToRemove = filterMultiple(query, records);
+
+    // Remove the records
+    int removed = 0;
+    for (T existingRecord : recordsToRemove) {
+      LOG.info("Removing \"{}\"", existingRecord);
+      try {
+        String primaryKey = getPrimaryKey(existingRecord);
+        String path = getNodePath(znode, primaryKey);
+        if (zkManager.delete(path)) {
+          removed++;
+        } else {
+          LOG.error("Did not remove \"{}\"", existingRecord);
+        }
+      } catch (Exception e) {
+        LOG.error("Cannot remove \"{}\"", existingRecord, e);
+        getMetrics().addFailure(monotonicNow() - start);
+      }
+    }
+    long end = monotonicNow();
+    if (removed > 0) {
+      getMetrics().addRemove(end - start);
+    }
+    return removed;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
+      throws IOException {
+    long start = monotonicNow();
+    boolean status = true;
+    String znode = getZNodeForClass(clazz);
+    LOG.info("Deleting all children under {}", znode);
+    try {
+      List<String> children = zkManager.getChildren(znode);
+      for (String child : children) {
+        String path = getNodePath(znode, child);
+        LOG.info("Deleting {}", path);
+        zkManager.delete(path);
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot remove {}: {}", znode, e.getMessage());
+      status = false;
+    }
+    long time = monotonicNow() - start;
+    if (status) {
+      getMetrics().addRemove(time);
+    } else {
+      getMetrics().addFailure(time);
+    }
+    return status;
+  }
+
+  private boolean writeNode(
+      String znode, byte[] bytes, boolean update, boolean error) {
+    try {
+      boolean created = zkManager.create(znode);
+      if (!update && !created && error) {
+        LOG.info("Cannot write record \"{}\", it already exists", znode);
+        return false;
+      }
+
+      // Write data
+      zkManager.setData(znode, bytes, -1);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
+    }
+    return false;
+  }
+
+  /**
+   * Get the ZNode for a class.
+   *
+   * @param clazz Record class to evaluate.
+   * @return The ZNode for the class.
+   */
+  private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
+    String className = getRecordName(clazz);
+    return getNodePath(baseZNode, className);
+  }
+
+  /**
+   * Creates a record from a string returned by ZooKeeper.
+   *
+   * @param data The data to write.
+   * @param stat Stat of the data record to create.
+   * @param clazz The data record type to create.
+   * @return The created record.
+   * @throws IOException
+   */
+  private <T extends BaseRecord> T createRecord(
+      String data, Stat stat, Class<T> clazz) throws IOException {
+    T record = newRecord(data, clazz, false);
+    record.setDateCreated(stat.getCtime());
+    record.setDateModified(stat.getMtime());
+    return record;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
new file mode 100644
index 0000000..a18433e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementations of state store data providers/drivers. Each driver is
+ * responsible for maintaining, querying, updating and deleting persistent data
+ * records. Data records are defined as subclasses of
+ * {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord}.
+ * Each driver implements the
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver} interface.
+ * <p>
+ * Currently supported drivers:
+ * <ul>
+ * <li>{@link StateStoreFileImpl} A file-based data storage backend.
+ * <li>{@link StateStoreZooKeeperImpl} A zookeeper based data storage backend.
+ * </ul>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
new file mode 100644
index 0000000..da998b5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The state store uses modular data storage classes derived from
+ * StateStoreDriver to handle querying, updating and deleting data records. The
+ * data storage driver is initialized and maintained by the StateStoreService.
+ * The state store supports fetching all records of a type, filtering by column
+ * values or fetching a single record by its primary key.
+ * <p>
+ * Each data storage backend is required to implement the methods contained in
+ * the StateStoreDriver interface. These methods allow the querying, updating,
+ * inserting and deleting of data records into the state store.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
new file mode 100644
index 0000000..57b7b61
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the {@link MembershipStore} State Store API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MembershipStoreImpl
+    extends MembershipStore implements StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MembershipStoreImpl.class);
+
+
+  /** Reported namespaces that are not decommissioned. */
+  private final Set<FederationNamespaceInfo> activeNamespaces;
+
+  /** Namenodes (after evaluating the quorum) that are active in the cluster. 
*/
+  private final Map<String, MembershipState> activeRegistrations;
+  /** Namenode status reports (raw) that were discarded for being too old. */
+  private final Map<String, MembershipState> expiredRegistrations;
+
+  /** Lock to access the local memory cache. */
+  private final ReadWriteLock cacheReadWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock cacheReadLock = cacheReadWriteLock.readLock();
+  private final Lock cacheWriteLock = cacheReadWriteLock.writeLock();
+
+
+  public MembershipStoreImpl(StateStoreDriver driver) {
+    super(driver);
+
+    this.activeRegistrations = new HashMap<>();
+    this.expiredRegistrations = new HashMap<>();
+    this.activeNamespaces = new TreeSet<>();
+  }
+
+  @Override
+  public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations(
+      GetNamenodeRegistrationsRequest request) throws IOException {
+
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance();
+    cacheReadLock.lock();
+    try {
+      Collection<MembershipState> vals = this.expiredRegistrations.values();
+      List<MembershipState> copyVals = new ArrayList<>(vals);
+      response.setNamenodeMemberships(copyVals);
+    } finally {
+      cacheReadLock.unlock();
+    }
+    return response;
+  }
+
+  @Override
+  public GetNamespaceInfoResponse getNamespaceInfo(
+      GetNamespaceInfoRequest request) throws IOException {
+
+    Set<FederationNamespaceInfo> namespaces = new HashSet<>();
+    try {
+      cacheReadLock.lock();
+      namespaces.addAll(activeNamespaces);
+    } finally {
+      cacheReadLock.unlock();
+    }
+
+    GetNamespaceInfoResponse response =
+        GetNamespaceInfoResponse.newInstance(namespaces);
+    return response;
+  }
+
+  @Override
+  public GetNamenodeRegistrationsResponse getNamenodeRegistrations(
+      final GetNamenodeRegistrationsRequest request) throws IOException {
+
+    // TODO Cache some common queries and sorts
+    List<MembershipState> ret = null;
+
+    cacheReadLock.lock();
+    try {
+      Collection<MembershipState> registrations = activeRegistrations.values();
+      MembershipState partialMembership = request.getPartialMembership();
+      if (partialMembership == null) {
+        ret = new ArrayList<>(registrations);
+      } else {
+        Query<MembershipState> query = new Query<>(partialMembership);
+        ret = filterMultiple(query, registrations);
+      }
+    } finally {
+      cacheReadLock.unlock();
+    }
+    // Sort in ascending update date order
+    Collections.sort(ret);
+
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance(ret);
+    return response;
+  }
+
+  @Override
+  public NamenodeHeartbeatResponse namenodeHeartbeat(
+      NamenodeHeartbeatRequest request) throws IOException {
+
+    MembershipState record = request.getNamenodeMembership();
+    String nnId = record.getNamenodeKey();
+    MembershipState existingEntry = null;
+    cacheReadLock.lock();
+    try {
+      existingEntry = this.activeRegistrations.get(nnId);
+    } finally {
+      cacheReadLock.unlock();
+    }
+
+    if (existingEntry != null) {
+      if (existingEntry.getState() != record.getState()) {
+        LOG.info("NN registration state has changed: {} -> {}",
+            existingEntry, record);
+      } else {
+        LOG.debug("Updating NN registration: {} -> {}", existingEntry, record);
+      }
+    } else {
+      LOG.info("Inserting new NN registration: {}", record);
+    }
+
+    boolean status = getDriver().put(record, true, false);
+
+    NamenodeHeartbeatResponse response =
+        NamenodeHeartbeatResponse.newInstance(status);
+    return response;
+  }
+
+  @Override
+  public boolean loadCache(boolean force) throws IOException {
+    super.loadCache(force);
+
+    // Update local cache atomically
+    cacheWriteLock.lock();
+    try {
+      this.activeRegistrations.clear();
+      this.expiredRegistrations.clear();
+      this.activeNamespaces.clear();
+
+      // Build list of NN registrations: nnId -> registration list
+      Map<String, List<MembershipState>> nnRegistrations = new HashMap<>();
+      List<MembershipState> cachedRecords = getCachedRecords();
+      for (MembershipState membership : cachedRecords) {
+        String nnId = membership.getNamenodeKey();
+        if (membership.getState() == FederationNamenodeServiceState.EXPIRED) {
+          // Expired, RPC service does not use these
+          String key = membership.getPrimaryKey();
+          this.expiredRegistrations.put(key, membership);
+        } else {
+          // This is a valid NN registration, build a list of all registrations
+          // using the NN id to use for the quorum calculation.
+          List<MembershipState> nnRegistration =
+              nnRegistrations.get(nnId);
+          if (nnRegistration == null) {
+            nnRegistration = new LinkedList<>();
+            nnRegistrations.put(nnId, nnRegistration);
+          }
+          nnRegistration.add(membership);
+          String bpId = membership.getBlockPoolId();
+          String cId = membership.getClusterId();
+          String nsId = membership.getNameserviceId();
+          FederationNamespaceInfo nsInfo =
+              new FederationNamespaceInfo(bpId, cId, nsId);
+          this.activeNamespaces.add(nsInfo);
+        }
+      }
+
+      // Calculate most representative entry for each active NN id
+      for (List<MembershipState> nnRegistration : nnRegistrations.values()) {
+        // Run quorum based on NN state
+        MembershipState representativeRecord =
+            getRepresentativeQuorum(nnRegistration);
+        String nnKey = representativeRecord.getNamenodeKey();
+        this.activeRegistrations.put(nnKey, representativeRecord);
+      }
+      LOG.debug("Refreshed {} NN registrations from State Store",
+          cachedRecords.size());
+    } finally {
+      cacheWriteLock.unlock();
+    }
+    return true;
+  }
+
+  @Override
+  public UpdateNamenodeRegistrationResponse updateNamenodeRegistration(
+      UpdateNamenodeRegistrationRequest request) throws IOException {
+
+    boolean status = false;
+    cacheWriteLock.lock();
+    try {
+      String namenode = MembershipState.getNamenodeKey(
+          request.getNameserviceId(), request.getNamenodeId());
+      MembershipState member = this.activeRegistrations.get(namenode);
+      if (member != null) {
+        member.setState(request.getState());
+        status = true;
+      }
+    } finally {
+      cacheWriteLock.unlock();
+    }
+    UpdateNamenodeRegistrationResponse response =
+        UpdateNamenodeRegistrationResponse.newInstance(status);
+    return response;
+  }
+
+  /**
+   * Picks the most recent entry in the subset that is most agreeable on the
+   * specified field. 1) If a majority of the collection has the same value for
+   * the field, the first sorted entry within the subset the matches the
+   * majority value 2) Otherwise the first sorted entry in the set of all
+   * entries
+   *
+   * @param records - Collection of state store record objects of the same type
+   * @return record that is most representative of the field name
+   */
+  private MembershipState getRepresentativeQuorum(
+      Collection<MembershipState> records) {
+
+    // Collate objects by field value: field value -> order set of records
+    Map<FederationNamenodeServiceState, TreeSet<MembershipState>> occurenceMap 
=
+        new HashMap<>();
+    for (MembershipState record : records) {
+      FederationNamenodeServiceState state = record.getState();
+      TreeSet<MembershipState> matchingSet = occurenceMap.get(state);
+      if (matchingSet == null) {
+        // TreeSet orders elements by descending date via comparators
+        matchingSet = new TreeSet<>();
+        occurenceMap.put(state, matchingSet);
+      }
+      matchingSet.add(record);
+    }
+
+    // Select largest group
+    TreeSet<MembershipState> largestSet = new TreeSet<>();
+    for (TreeSet<MembershipState> matchingSet : occurenceMap.values()) {
+      if (largestSet.size() < matchingSet.size()) {
+        largestSet = matchingSet;
+      }
+    }
+
+    // If quorum, use the newest element here
+    if (largestSet.size() > records.size() / 2) {
+      return largestSet.first();
+      // Otherwise, return most recent by class comparator
+    } else if (records.size() > 0) {
+      TreeSet<MembershipState> sortedList = new TreeSet<>(records);
+      LOG.debug("Quorum failed, using most recent: {}", sortedList.first());
+      return sortedList.first();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
new file mode 100644
index 0000000..eb117d6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Implementation of the {@link MountTableStore} state store API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MountTableStoreImpl extends MountTableStore {
+
+  public MountTableStoreImpl(StateStoreDriver driver) {
+    super(driver);
+  }
+
+  @Override
+  public AddMountTableEntryResponse addMountTableEntry(
+      AddMountTableEntryRequest request) throws IOException {
+    MountTable mountTable = request.getEntry();
+    if (mountTable != null) {
+      RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker();
+      if (pc != null) {
+        pc.checkPermission(mountTable, FsAction.WRITE);
+      }
+    }
+
+    boolean status = getDriver().put(mountTable, false, true);
+    AddMountTableEntryResponse response =
+        AddMountTableEntryResponse.newInstance();
+    response.setStatus(status);
+    return response;
+  }
+
+  @Override
+  public UpdateMountTableEntryResponse updateMountTableEntry(
+      UpdateMountTableEntryRequest request) throws IOException {
+    MountTable mountTable = request.getEntry();
+    if (mountTable != null) {
+      RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker();
+      if (pc != null) {
+        pc.checkPermission(mountTable, FsAction.WRITE);
+      }
+    }
+
+    boolean status = getDriver().put(mountTable, true, true);
+    UpdateMountTableEntryResponse response =
+        UpdateMountTableEntryResponse.newInstance();
+    response.setStatus(status);
+    return response;
+  }
+
+  @Override
+  public RemoveMountTableEntryResponse removeMountTableEntry(
+      RemoveMountTableEntryRequest request) throws IOException {
+    final String srcPath = request.getSrcPath();
+    final MountTable partial = MountTable.newInstance();
+    partial.setSourcePath(srcPath);
+    final Query<MountTable> query = new Query<>(partial);
+    final MountTable deleteEntry = getDriver().get(getRecordClass(), query);
+
+    boolean status = false;
+    if (deleteEntry != null) {
+      RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker();
+      if (pc != null) {
+        pc.checkPermission(deleteEntry, FsAction.WRITE);
+      }
+      status = getDriver().remove(deleteEntry);
+    }
+
+    RemoveMountTableEntryResponse response =
+        RemoveMountTableEntryResponse.newInstance();
+    response.setStatus(status);
+    return response;
+  }
+
+  @Override
+  public GetMountTableEntriesResponse getMountTableEntries(
+      GetMountTableEntriesRequest request) throws IOException {
+    RouterPermissionChecker pc =
+        RouterAdminServer.getPermissionChecker();
+    // Get all values from the cache
+    List<MountTable> records = getCachedRecords();
+
+    // Sort and filter
+    Collections.sort(records, MountTable.SOURCE_COMPARATOR);
+    String reqSrcPath = request.getSrcPath();
+    if (reqSrcPath != null && !reqSrcPath.isEmpty()) {
+      // Return only entries beneath this path
+      Iterator<MountTable> it = records.iterator();
+      while (it.hasNext()) {
+        MountTable record = it.next();
+        String srcPath = record.getSourcePath();
+        if (!srcPath.startsWith(reqSrcPath)) {
+          it.remove();
+        } else if (pc != null) {
+          // do the READ permission check
+          try {
+            pc.checkPermission(record, FsAction.READ);
+          } catch (AccessControlException ignored) {
+            // Remove this mount table entry if it cannot
+            // be accessed by current user.
+            it.remove();
+          }
+        }
+      }
+    }
+
+    GetMountTableEntriesResponse response =
+        GetMountTableEntriesResponse.newInstance();
+    response.setEntries(records);
+    response.setTimestamp(Time.now());
+    return response;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
new file mode 100644
index 0000000..d58c288
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+
+/**
+ * Implementation of the {@link RouterStore} state store API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RouterStoreImpl extends RouterStore {
+
+  public RouterStoreImpl(StateStoreDriver driver) {
+    super(driver);
+  }
+
+  @Override
+  public GetRouterRegistrationResponse getRouterRegistration(
+      GetRouterRegistrationRequest request) throws IOException {
+
+    final RouterState partial = RouterState.newInstance();
+    partial.setAddress(request.getRouterId());
+    final Query<RouterState> query = new Query<RouterState>(partial);
+    RouterState record = getDriver().get(getRecordClass(), query);
+    if (record != null) {
+      overrideExpiredRecord(record);
+    }
+    GetRouterRegistrationResponse response =
+        GetRouterRegistrationResponse.newInstance();
+    response.setRouter(record);
+    return response;
+  }
+
+  @Override
+  public GetRouterRegistrationsResponse getRouterRegistrations(
+      GetRouterRegistrationsRequest request) throws IOException {
+
+    // Get all values from the cache
+    QueryResult<RouterState> recordsAndTimeStamp =
+        getCachedRecordsAndTimeStamp();
+    List<RouterState> records = recordsAndTimeStamp.getRecords();
+    long timestamp = recordsAndTimeStamp.getTimestamp();
+
+    // Generate response
+    GetRouterRegistrationsResponse response =
+        GetRouterRegistrationsResponse.newInstance();
+    response.setRouters(records);
+    response.setTimestamp(timestamp);
+    return response;
+  }
+
+  @Override
+  public RouterHeartbeatResponse routerHeartbeat(RouterHeartbeatRequest 
request)
+      throws IOException {
+
+    RouterState record = request.getRouter();
+    boolean status = getDriver().put(record, true, false);
+    RouterHeartbeatResponse response =
+        RouterHeartbeatResponse.newInstance(status);
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
new file mode 100644
index 0000000..1a50d15
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementations of the state store API interfaces. All classes
+ * derive from {@link
+ * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API
+ * definitions are contained in the
+ * org.apache.hadoop.hdfs.server.federation.store package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
new file mode 100644
index 0000000..949ec7c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The federation state store tracks persistent values that are shared between
+ * multiple routers.
+ * <p>
+ * Data is stored in data records that inherit from a common class. Data 
records
+ * are serialized when written to the data store using a modular serialization
+ * implementation. The default is profobuf serialization. Data is stored as 
rows
+ * of records of the same type with each data member in a record representing a
+ * column.
+ * <p>
+ * The state store uses a modular data storage
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver} to handle querying, updating and deleting data records. 
The
+ * data storage driver is initialized and maintained by the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.
+ * StateStoreService FederationStateStoreService}. The state store
+ * supports fetching all records of a type, filtering by column values or
+ * fetching a single record by its primary key.
+ * <p>
+ * The state store contains several API interfaces, one for each data records
+ * type.
+ * <p>
+ * <ul>
+ * <li>FederationMembershipStateStore: state of all Namenodes in the 
federation.
+ * Uses the MembershipState record.
+ * <li>FederationMountTableStore: Mount table mapping paths in the global
+ * namespace to individual subcluster paths. Uses the MountTable record.
+ * <li>RouterStateStore: State of all routers in the federation. Uses the
+ * RouterState record.
+ * </ul>
+ * <p>
+ * Each API is defined in a separate interface. The implementations of these
+ * interfaces are responsible for accessing the
+ * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver} to query, update and delete data records.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
new file mode 100644
index 0000000..2d9f102
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * API request for adding a mount table entry to the state store.
+ */
+public abstract class AddMountTableEntryRequest {
+
+  public static AddMountTableEntryRequest newInstance() {
+    return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class);
+  }
+
+  public static AddMountTableEntryRequest newInstance(MountTable newEntry) {
+    AddMountTableEntryRequest request = newInstance();
+    request.setEntry(newEntry);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract MountTable getEntry();
+
+  @Public
+  @Unstable
+  public abstract void setEntry(MountTable mount);
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to