http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
deleted file mode 100644
index 1bd35f2..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
-import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-import org.apache.hadoop.hdfs.server.federation.store.records.Query;
-import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
-
-/**
- * Base implementation of a State Store driver. It contains default
- * implementations for the optional functions. These implementations use an
- * uncached read/write all algorithm for all changes. In most cases it is
- * recommended to override the optional functions.
- * <p>
- * Drivers may optionally override additional routines for performance
- * optimization, such as custom get/put/remove queries, depending on the
- * capabilities of the data store.
- * <p>
- */
-public abstract class StateStoreBaseImpl extends StateStoreDriver {
-
-  @Override
-  public <T extends BaseRecord> T get(
-      Class<T> clazz, Query<T> query) throws IOException {
-    List<T> records = getMultiple(clazz, query);
-    if (records.size() > 1) {
-      throw new IOException("Found more than one object in collection");
-    } else if (records.size() == 1) {
-      return records.get(0);
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public <T extends BaseRecord> List<T> getMultiple(
-      Class<T> clazz, Query<T> query) throws IOException  {
-    QueryResult<T> result = get(clazz);
-    List<T> records = result.getRecords();
-    List<T> ret = filterMultiple(query, records);
-    if (ret == null) {
-      throw new IOException("Cannot fetch records from the store");
-    }
-    return ret;
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean put(
-      T record, boolean allowUpdate, boolean errorIfExists) throws IOException 
{
-    List<T> singletonList = new ArrayList<>();
-    singletonList.add(record);
-    return putAll(singletonList, allowUpdate, errorIfExists);
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean remove(T record) throws IOException {
-    final Query<T> query = new Query<T>(record);
-    Class<? extends BaseRecord> clazz = record.getClass();
-    @SuppressWarnings("unchecked")
-    Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
-    return remove(recordClass, query) == 1;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
deleted file mode 100644
index 6638d1c..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
+++ /dev/null
@@ -1,462 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
-import static org.apache.hadoop.util.Time.monotonicNow;
-import static org.apache.hadoop.util.Time.now;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
-import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
-import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-import org.apache.hadoop.hdfs.server.federation.store.records.Query;
-import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * {@link StateStoreDriver} implementation based on files. In this approach, we
- * use temporary files for the writes and renaming "atomically" to the final
- * value. Instead of writing to the final location, it will go to a temporary
- * one and then rename to the final destination.
- */
-public abstract class StateStoreFileBaseImpl
-    extends StateStoreSerializableImpl {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
-
-  /** File extension for temporary files. */
-  private static final String TMP_MARK = ".tmp";
-  /** We remove temporary files older than 10 seconds. */
-  private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
-  /** File pattern for temporary records: file.XYZ.tmp. */
-  private static final Pattern OLD_TMP_RECORD_PATTERN =
-      Pattern.compile(".+\\.(\\d+)\\.tmp");
-
-  /** If it is initialized. */
-  private boolean initialized = false;
-
-
-  /**
-   * Get the reader of a record for the file system.
-   *
-   * @param path Path of the record to read.
-   * @return Reader for the record.
-   */
-  protected abstract <T extends BaseRecord> BufferedReader getReader(
-      String path);
-
-  /**
-   * Get the writer of a record for the file system.
-   *
-   * @param path Path of the record to write.
-   * @return Writer for the record.
-   */
-  protected abstract <T extends BaseRecord> BufferedWriter getWriter(
-      String path);
-
-  /**
-   * Check if a path exists.
-   *
-   * @param path Path to check.
-   * @return If the path exists.
-   */
-  protected abstract boolean exists(String path);
-
-  /**
-   * Make a directory.
-   *
-   * @param path Path of the directory to create.
-   * @return If the directory was created.
-   */
-  protected abstract boolean mkdir(String path);
-
-  /**
-   * Rename a file. This should be atomic.
-   *
-   * @param src Source name.
-   * @param dst Destination name.
-   * @return If the rename was successful.
-   */
-  protected abstract boolean rename(String src, String dst);
-
-  /**
-   * Remove a file.
-   *
-   * @param path Path for the file to remove
-   * @return If the file was removed.
-   */
-  protected abstract boolean remove(String path);
-
-  /**
-   * Get the children for a path.
-   *
-   * @param path Path to check.
-   * @return List of children.
-   */
-  protected abstract List<String> getChildren(String path);
-
-  /**
-   * Get root directory.
-   *
-   * @return Root directory.
-   */
-  protected abstract String getRootDir();
-
-  /**
-   * Set the driver as initialized.
-   *
-   * @param ini If the driver is initialized.
-   */
-  public void setInitialized(boolean ini) {
-    this.initialized = ini;
-  }
-
-  @Override
-  public boolean initDriver() {
-    String rootDir = getRootDir();
-    try {
-      if (rootDir == null) {
-        LOG.error("Invalid root directory, unable to initialize driver.");
-        return false;
-      }
-
-      // Check root path
-      if (!exists(rootDir)) {
-        if (!mkdir(rootDir)) {
-          LOG.error("Cannot create State Store root directory {}", rootDir);
-          return false;
-        }
-      }
-    } catch (Exception ex) {
-      LOG.error(
-          "Cannot initialize filesystem using root directory {}", rootDir, ex);
-      return false;
-    }
-    setInitialized(true);
-    return true;
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean initRecordStorage(
-      String className, Class<T> recordClass) {
-
-    String dataDirPath = getRootDir() + "/" + className;
-    try {
-      // Create data directories for files
-      if (!exists(dataDirPath)) {
-        LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
-        if (!mkdir(dataDirPath)) {
-          LOG.error("Cannot create data directory {}", dataDirPath);
-          return false;
-        }
-      }
-    } catch (Exception ex) {
-      LOG.error("Cannot create data directory {}", dataDirPath, ex);
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
-      throws IOException {
-    verifyDriverReady();
-    long start = monotonicNow();
-    StateStoreMetrics metrics = getMetrics();
-    List<T> ret = new ArrayList<>();
-    try {
-      String path = getPathForClass(clazz);
-      List<String> children = getChildren(path);
-      for (String child : children) {
-        String pathRecord = path + "/" + child;
-        if (child.endsWith(TMP_MARK)) {
-          LOG.debug("There is a temporary file {} in {}", child, path);
-          if (isOldTempRecord(child)) {
-            LOG.warn("Removing {} as it's an old temporary record", child);
-            remove(pathRecord);
-          }
-        } else {
-          T record = getRecord(pathRecord, clazz);
-          ret.add(record);
-        }
-      }
-    } catch (Exception e) {
-      if (metrics != null) {
-        metrics.addFailure(monotonicNow() - start);
-      }
-      String msg = "Cannot fetch records for " + clazz.getSimpleName();
-      LOG.error(msg, e);
-      throw new IOException(msg, e);
-    }
-
-    if (metrics != null) {
-      metrics.addRead(monotonicNow() - start);
-    }
-    return new QueryResult<T>(ret, getTime());
-  }
-
-  /**
-   * Check if a record is temporary and old.
-   *
-   * @param pathRecord Path for the record to check.
-   * @return If the record is temporary and old.
-   */
-  @VisibleForTesting
-  public static boolean isOldTempRecord(final String pathRecord) {
-    if (!pathRecord.endsWith(TMP_MARK)) {
-      return false;
-    }
-    // Extract temporary record creation time
-    Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
-    if (m.find()) {
-      long time = Long.parseLong(m.group(1));
-      return now() - time > OLD_TMP_RECORD_MS;
-    }
-    return false;
-  }
-
-  /**
-   * Read a record from a file.
-   *
-   * @param path Path to the file containing the record.
-   * @param clazz Class of the record.
-   * @return Record read from the file.
-   * @throws IOException If the file cannot be read.
-   */
-  private <T extends BaseRecord> T getRecord(
-      final String path, final Class<T> clazz) throws IOException {
-    BufferedReader reader = getReader(path);
-    try {
-      String line;
-      while ((line = reader.readLine()) != null) {
-        if (!line.startsWith("#") && line.length() > 0) {
-          try {
-            T record = newRecord(line, clazz, false);
-            return record;
-          } catch (Exception ex) {
-            LOG.error("Cannot parse line {} in file {}", line, path, ex);
-          }
-        }
-      }
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
-    }
-    throw new IOException("Cannot read " + path + " for record " +
-        clazz.getSimpleName());
-  }
-
-  /**
-   * Get the path for a record class.
-   * @param clazz Class of the record.
-   * @return Path for this record class.
-   */
-  private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
-    String className = StateStoreUtils.getRecordName(clazz);
-    StringBuilder sb = new StringBuilder();
-    sb.append(getRootDir());
-    if (sb.charAt(sb.length() - 1) != '/') {
-      sb.append("/");
-    }
-    sb.append(className);
-    return sb.toString();
-  }
-
-  @Override
-  public boolean isDriverReady() {
-    return this.initialized;
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean putAll(
-      List<T> records, boolean allowUpdate, boolean errorIfExists)
-          throws StateStoreUnavailableException {
-    verifyDriverReady();
-    if (records.isEmpty()) {
-      return true;
-    }
-
-    long start = monotonicNow();
-    StateStoreMetrics metrics = getMetrics();
-
-    // Check if any record exists
-    Map<String, T> toWrite = new HashMap<>();
-    for (T record : records) {
-      Class<? extends BaseRecord> recordClass = record.getClass();
-      String path = getPathForClass(recordClass);
-      String primaryKey = getPrimaryKey(record);
-      String recordPath = path + "/" + primaryKey;
-
-      if (exists(recordPath)) {
-        if (allowUpdate) {
-          // Update the mod time stamp. Many backends will use their
-          // own timestamp for the mod time.
-          record.setDateModified(this.getTime());
-          toWrite.put(recordPath, record);
-        } else if (errorIfExists) {
-          LOG.error("Attempt to insert record {} that already exists",
-              recordPath);
-          if (metrics != null) {
-            metrics.addFailure(monotonicNow() - start);
-          }
-          return false;
-        } else  {
-          LOG.debug("Not updating {}", record);
-        }
-      } else {
-        toWrite.put(recordPath, record);
-      }
-    }
-
-    // Write the records
-    boolean success = true;
-    for (Entry<String, T> entry : toWrite.entrySet()) {
-      String recordPath = entry.getKey();
-      String recordPathTemp = recordPath + "." + now() + TMP_MARK;
-      BufferedWriter writer = getWriter(recordPathTemp);
-      try {
-        T record = entry.getValue();
-        String line = serializeString(record);
-        writer.write(line);
-      } catch (IOException e) {
-        LOG.error("Cannot write {}", recordPathTemp, e);
-        success = false;
-      } finally {
-        if (writer != null) {
-          try {
-            writer.close();
-          } catch (IOException e) {
-            LOG.error("Cannot close the writer for {}", recordPathTemp);
-          }
-        }
-      }
-      // Commit
-      if (!rename(recordPathTemp, recordPath)) {
-        LOG.error("Failed committing record into {}", recordPath);
-        success = false;
-      }
-    }
-
-    long end = monotonicNow();
-    if (metrics != null) {
-      if (success) {
-        metrics.addWrite(end - start);
-      } else {
-        metrics.addFailure(end - start);
-      }
-    }
-    return success;
-  }
-
-  @Override
-  public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
-      throws StateStoreUnavailableException {
-    verifyDriverReady();
-
-    if (query == null) {
-      return 0;
-    }
-
-    long start = Time.monotonicNow();
-    StateStoreMetrics metrics = getMetrics();
-    int removed = 0;
-    // Get the current records
-    try {
-      final QueryResult<T> result = get(clazz);
-      final List<T> existingRecords = result.getRecords();
-      // Write all of the existing records except those to be removed
-      final List<T> recordsToRemove = filterMultiple(query, existingRecords);
-      boolean success = true;
-      for (T recordToRemove : recordsToRemove) {
-        String path = getPathForClass(clazz);
-        String primaryKey = getPrimaryKey(recordToRemove);
-        String recordToRemovePath = path + "/" + primaryKey;
-        if (remove(recordToRemovePath)) {
-          removed++;
-        } else {
-          LOG.error("Cannot remove record {}", recordToRemovePath);
-          success = false;
-        }
-      }
-      if (!success) {
-        LOG.error("Cannot remove records {} query {}", clazz, query);
-        if (metrics != null) {
-          metrics.addFailure(monotonicNow() - start);
-        }
-      }
-    } catch (IOException e) {
-      LOG.error("Cannot remove records {} query {}", clazz, query, e);
-      if (metrics != null) {
-        metrics.addFailure(monotonicNow() - start);
-      }
-    }
-
-    if (removed > 0 && metrics != null) {
-      metrics.addRemove(monotonicNow() - start);
-    }
-    return removed;
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
-      throws StateStoreUnavailableException {
-    verifyDriverReady();
-    long start = Time.monotonicNow();
-    StateStoreMetrics metrics = getMetrics();
-
-    boolean success = true;
-    String path = getPathForClass(clazz);
-    List<String> children = getChildren(path);
-    for (String child : children) {
-      String pathRecord = path + "/" + child;
-      if (!remove(pathRecord)) {
-        success = false;
-      }
-    }
-
-    if (metrics != null) {
-      long time = Time.monotonicNow() - start;
-      if (success) {
-        metrics.addRemove(time);
-      } else {
-        metrics.addFailure(time);
-      }
-    }
-    return success;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
deleted file mode 100644
index c585a23..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.io.Files;
-
-/**
- * StateStoreDriver implementation based on a local file.
- */
-public class StateStoreFileImpl extends StateStoreFileBaseImpl {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StateStoreFileImpl.class);
-
-  /** Configuration keys. */
-  public static final String FEDERATION_STORE_FILE_DIRECTORY =
-      DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
-
-  /** Root directory for the state store. */
-  private String rootDirectory;
-
-
-  @Override
-  protected boolean exists(String path) {
-    File test = new File(path);
-    return test.exists();
-  }
-
-  @Override
-  protected boolean mkdir(String path) {
-    File dir = new File(path);
-    return dir.mkdirs();
-  }
-
-  @Override
-  protected boolean rename(String src, String dst) {
-    try {
-      Files.move(new File(src), new File(dst));
-      return true;
-    } catch (IOException e) {
-      LOG.error("Cannot rename {} to {}", src, dst, e);
-      return false;
-    }
-  }
-
-  @Override
-  protected boolean remove(String path) {
-    File file = new File(path);
-    return file.delete();
-  }
-
-  @Override
-  protected String getRootDir() {
-    if (this.rootDirectory == null) {
-      String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
-      if (dir == null) {
-        File tempDir = Files.createTempDir();
-        dir = tempDir.getAbsolutePath();
-        LOG.warn("The root directory is not available, using {}", dir);
-      }
-      this.rootDirectory = dir;
-    }
-    return this.rootDirectory;
-  }
-
-  @Override
-  protected <T extends BaseRecord> BufferedReader getReader(String filename) {
-    BufferedReader reader = null;
-    try {
-      LOG.debug("Loading file: {}", filename);
-      File file = new File(filename);
-      FileInputStream fis = new FileInputStream(file);
-      InputStreamReader isr =
-          new InputStreamReader(fis, StandardCharsets.UTF_8);
-      reader = new BufferedReader(isr);
-    } catch (Exception ex) {
-      LOG.error("Cannot open read stream for record {}", filename, ex);
-    }
-    return reader;
-  }
-
-  @Override
-  protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
-    BufferedWriter writer = null;
-    try {
-      LOG.debug("Writing file: {}", filename);
-      File file = new File(filename);
-      FileOutputStream fos = new FileOutputStream(file, false);
-      OutputStreamWriter osw =
-          new OutputStreamWriter(fos, StandardCharsets.UTF_8);
-      writer = new BufferedWriter(osw);
-    } catch (IOException e) {
-      LOG.error("Cannot open write stream for record {}", filename, e);
-    }
-    return writer;
-  }
-
-  @Override
-  public void close() throws Exception {
-    setInitialized(false);
-  }
-
-  @Override
-  protected List<String> getChildren(String path) {
-    List<String> ret = new LinkedList<>();
-    File dir = new File(path);
-    File[] files = dir.listFiles();
-    if (files != null) {
-      for (File file : files) {
-        String filename = file.getName();
-        ret.add(filename);
-      }
-    }
-    return ret;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
deleted file mode 100644
index 8d6c626..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StateStoreDriver} implementation based on a filesystem. The most common uses
- * HDFS as a backend.
- */
-public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
-
-
-  /** Configuration keys. */
-  public static final String FEDERATION_STORE_FS_PATH =
-      DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
-
-  /** File system to back the State Store. */
-  private FileSystem fs;
-  /** Working path in the filesystem. */
-  private String workPath;
-
-  @Override
-  protected boolean exists(String path) {
-    try {
-      return fs.exists(new Path(path));
-    } catch (IOException e) {
-      return false;
-    }
-  }
-
-  @Override
-  protected boolean mkdir(String path) {
-    try {
-      return fs.mkdirs(new Path(path));
-    } catch (IOException e) {
-      return false;
-    }
-  }
-
-  @Override
-  protected boolean rename(String src, String dst) {
-    try {
-      if (fs instanceof DistributedFileSystem) {
-        DistributedFileSystem dfs = (DistributedFileSystem)fs;
-        dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
-        return true;
-      } else {
-        // Replace should be atomic but not available
-        if (fs.exists(new Path(dst))) {
-          fs.delete(new Path(dst), true);
-        }
-        return fs.rename(new Path(src), new Path(dst));
-      }
-    } catch (Exception e) {
-      LOG.error("Cannot rename {} to {}", src, dst, e);
-      return false;
-    }
-  }
-
-  @Override
-  protected boolean remove(String path) {
-    try {
-      return fs.delete(new Path(path), true);
-    } catch (Exception e) {
-      LOG.error("Cannot remove {}", path, e);
-      return false;
-    }
-  }
-
-  @Override
-  protected String getRootDir() {
-    if (this.workPath == null) {
-      String rootPath = getConf().get(FEDERATION_STORE_FS_PATH);
-      URI workUri;
-      try {
-        workUri = new URI(rootPath);
-        fs = FileSystem.get(workUri, getConf());
-      } catch (Exception ex) {
-        return null;
-      }
-      this.workPath = rootPath;
-    }
-    return this.workPath;
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (fs != null) {
-      fs.close();
-    }
-  }
-
-  @Override
-  protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
-    BufferedReader reader = null;
-    Path path = new Path(pathName);
-    try {
-      FSDataInputStream fdis = fs.open(path);
-      InputStreamReader isr =
-          new InputStreamReader(fdis, StandardCharsets.UTF_8);
-      reader = new BufferedReader(isr);
-    } catch (IOException ex) {
-      LOG.error("Cannot open read stream for {}", path);
-    }
-    return reader;
-  }
-
-  @Override
-  protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
-    BufferedWriter writer = null;
-    Path path = new Path(pathName);
-    try {
-      FSDataOutputStream fdos = fs.create(path, true);
-      OutputStreamWriter osw =
-          new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
-      writer = new BufferedWriter(osw);
-    } catch (IOException ex) {
-      LOG.error("Cannot open write stream for {}", path);
-    }
-    return writer;
-  }
-
-  @Override
-  protected List<String> getChildren(String pathName) {
-    List<String> ret = new LinkedList<>();
-    Path path = new Path(workPath, pathName);
-    try {
-      FileStatus[] files = fs.listStatus(path);
-      for (FileStatus file : files) {
-        Path filePath = file.getPath();
-        String fileName = filePath.getName();
-        ret.add(fileName);
-      }
-    } catch (Exception e) {
-      LOG.error("Cannot get children for {}", pathName, e);
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
deleted file mode 100644
index 7bc93de..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
-import 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-
-/**
- * State Store driver that stores a serialization of the records. The 
serializer
- * is pluggable.
- */
-public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
-
-  /** Mark for slashes in path names. */
-  protected static final String SLASH_MARK = "0SLASH0";
-  /** Mark for colon in path names. */
-  protected static final String COLON_MARK = "_";
-
-  /** Default serializer for this driver. */
-  private StateStoreSerializer serializer;
-
-
-  @Override
-  public boolean init(final Configuration config, final String id,
-      final Collection<Class<? extends BaseRecord>> records,
-      final StateStoreMetrics metrics) {
-    boolean ret = super.init(config, id, records, metrics);
-
-    this.serializer = StateStoreSerializer.getSerializer(config);
-
-    return ret;
-  }
-
-  /**
-   * Serialize a record using the serializer.
-   * @param record Record to serialize.
-   * @return Byte array with the serialization of the record.
-   */
-  protected <T extends BaseRecord> byte[] serialize(T record) {
-    return serializer.serialize(record);
-  }
-
-  /**
-   * Serialize a record using the serializer.
-   * @param record Record to serialize.
-   * @return String with the serialization of the record.
-   */
-  protected <T extends BaseRecord> String serializeString(T record) {
-    return serializer.serializeString(record);
-  }
-
-  /**
-   * Creates a record from an input data string.
-   * @param data Serialized text of the record.
-   * @param clazz Record class.
-   * @param includeDates If dateModified and dateCreated are serialized.
-   * @return The created record.
-   * @throws IOException
-   */
-  protected <T extends BaseRecord> T newRecord(
-      String data, Class<T> clazz, boolean includeDates) throws IOException {
-    return serializer.deserialize(data, clazz);
-  }
-
-  /**
-   * Get the primary key for a record. If we don't want to store in folders, we
-   * need to remove / from the name.
-   *
-   * @param record Record to get the primary key for.
-   * @return Primary key for the record.
-   */
-  protected static String getPrimaryKey(BaseRecord record) {
-    String primaryKey = record.getPrimaryKey();
-    primaryKey = primaryKey.replaceAll("/", SLASH_MARK);
-    primaryKey = primaryKey.replaceAll(":", COLON_MARK);
-    return primaryKey;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
deleted file mode 100644
index 45c5dd6..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.protobuf.Message;
-
-/**
- * Protobuf implementation of the State Store serializer.
- */
-public final class StateStoreSerializerPBImpl extends StateStoreSerializer {
-
-  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
-  private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
-
-  private Configuration localConf = new Configuration();
-
-
-  private StateStoreSerializerPBImpl() {
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <T> T newRecordInstance(Class<T> clazz) {
-    try {
-      String clazzPBImpl = getPBImplClassName(clazz);
-      Class<?> pbClazz = localConf.getClassByName(clazzPBImpl);
-      Object retObject = ReflectionUtils.newInstance(pbClazz, localConf);
-      return (T)retObject;
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private String getPBImplClassName(Class<?> clazz) {
-    String srcPackagePart = getPackageName(clazz);
-    String srcClassName = getClassName(clazz);
-    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
-    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
-    return destPackagePart + "." + destClassPart;
-  }
-
-  private String getClassName(Class<?> clazz) {
-    String fqName = clazz.getName();
-    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
-  }
-
-  private String getPackageName(Class<?> clazz) {
-    return clazz.getPackage().getName();
-  }
-
-  @Override
-  public byte[] serialize(BaseRecord record) {
-    byte[] byteArray64 = null;
-    if (record instanceof PBRecord) {
-      PBRecord recordPB = (PBRecord) record;
-      Message msg = recordPB.getProto();
-      byte[] byteArray = msg.toByteArray();
-      byteArray64 = Base64.encodeBase64(byteArray, false);
-    }
-    return byteArray64;
-  }
-
-  @Override
-  public String serializeString(BaseRecord record) {
-    byte[] byteArray64 = serialize(record);
-    String base64Encoded = StringUtils.newStringUtf8(byteArray64);
-    return base64Encoded;
-  }
-
-  @Override
-  public <T extends BaseRecord> T deserialize(
-      byte[] byteArray, Class<T> clazz) throws IOException {
-
-    T record = newRecord(clazz);
-    if (record instanceof PBRecord) {
-      PBRecord pbRecord = (PBRecord) record;
-      byte[] byteArray64 = Base64.encodeBase64(byteArray, false);
-      String base64Encoded = StringUtils.newStringUtf8(byteArray64);
-      pbRecord.readInstance(base64Encoded);
-    }
-    return record;
-  }
-
-  @Override
-  public <T extends BaseRecord> T deserialize(String data, Class<T> clazz)
-      throws IOException {
-    byte[] byteArray64 = Base64.decodeBase64(data);
-    return deserialize(byteArray64, clazz);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
deleted file mode 100644
index 7f98171..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
-import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
-import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
-import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
-import org.apache.hadoop.hdfs.server.federation.store.records.Query;
-import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
-import org.apache.hadoop.util.curator.ZKCuratorManager;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
- * backend.
- * <p>
- * The structure of the znodes in the ensemble is:
- * PARENT_PATH
- * |--- MOUNT
- * |--- MEMBERSHIP
- * |--- REBALANCER
- * |--- ROUTERS
- */
-public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
-
-
-  /** Configuration keys. */
-  public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
-      DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
-  public static final String FEDERATION_STORE_ZK_PARENT_PATH =
-      FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
-  public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
-      "/hdfs-federation";
-
-
-  /** Directory to store the state store data. */
-  private String baseZNode;
-
-  /** Interface to ZooKeeper. */
-  private ZKCuratorManager zkManager;
-
-
-  @Override
-  public boolean initDriver() {
-    LOG.info("Initializing ZooKeeper connection");
-
-    Configuration conf = getConf();
-    baseZNode = conf.get(
-        FEDERATION_STORE_ZK_PARENT_PATH,
-        FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
-    try {
-      this.zkManager = new ZKCuratorManager(conf);
-      this.zkManager.start();
-    } catch (IOException e) {
-      LOG.error("Cannot initialize the ZK connection", e);
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean initRecordStorage(
-      String className, Class<T> clazz) {
-    try {
-      String checkPath = getNodePath(baseZNode, className);
-      zkManager.createRootDirRecursively(checkPath);
-      return true;
-    } catch (Exception e) {
-      LOG.error("Cannot initialize ZK node for {}: {}",
-          className, e.getMessage());
-      return false;
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (zkManager  != null) {
-      zkManager.close();
-    }
-  }
-
-  @Override
-  public boolean isDriverReady() {
-    if (zkManager == null) {
-      return false;
-    }
-    CuratorFramework curator = zkManager.getCurator();
-    if (curator == null) {
-      return false;
-    }
-    return curator.getState() == CuratorFrameworkState.STARTED;
-  }
-
-  @Override
-  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
-      throws IOException {
-    verifyDriverReady();
-    long start = monotonicNow();
-    List<T> ret = new ArrayList<>();
-    String znode = getZNodeForClass(clazz);
-    try {
-      List<String> children = zkManager.getChildren(znode);
-      for (String child : children) {
-        try {
-          String path = getNodePath(znode, child);
-          Stat stat = new Stat();
-          String data = zkManager.getStringData(path, stat);
-          boolean corrupted = false;
-          if (data == null || data.equals("")) {
-            // All records should have data, otherwise this is corrupted
-            corrupted = true;
-          } else {
-            try {
-              T record = createRecord(data, stat, clazz);
-              ret.add(record);
-            } catch (IOException e) {
-              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
-                  clazz.getSimpleName(), data, e.getMessage());
-              corrupted = true;
-            }
-          }
-
-          if (corrupted) {
-            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
-                child, path);
-            zkManager.delete(path);
-          }
-        } catch (Exception e) {
-          LOG.error("Cannot get data for {}: {}", child, e.getMessage());
-        }
-      }
-    } catch (Exception e) {
-      getMetrics().addFailure(monotonicNow() - start);
-      String msg = "Cannot get children for \"" + znode + "\": " +
-          e.getMessage();
-      LOG.error(msg);
-      throw new IOException(msg);
-    }
-    long end = monotonicNow();
-    getMetrics().addRead(end - start);
-    return new QueryResult<T>(ret, getTime());
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean putAll(
-      List<T> records, boolean update, boolean error) throws IOException {
-    verifyDriverReady();
-    if (records.isEmpty()) {
-      return true;
-    }
-
-    // All records should be the same
-    T record0 = records.get(0);
-    Class<? extends BaseRecord> recordClass = record0.getClass();
-    String znode = getZNodeForClass(recordClass);
-
-    long start = monotonicNow();
-    boolean status = true;
-    for (T record : records) {
-      String primaryKey = getPrimaryKey(record);
-      String recordZNode = getNodePath(znode, primaryKey);
-      byte[] data = serialize(record);
-      if (!writeNode(recordZNode, data, update, error)){
-        status = false;
-      }
-    }
-    long end = monotonicNow();
-    if (status) {
-      getMetrics().addWrite(end - start);
-    } else {
-      getMetrics().addFailure(end - start);
-    }
-    return status;
-  }
-
-  @Override
-  public <T extends BaseRecord> int remove(
-      Class<T> clazz, Query<T> query) throws IOException {
-    verifyDriverReady();
-    if (query == null) {
-      return 0;
-    }
-
-    // Read the current data
-    long start = monotonicNow();
-    List<T> records = null;
-    try {
-      QueryResult<T> result = get(clazz);
-      records = result.getRecords();
-    } catch (IOException ex) {
-      LOG.error("Cannot get existing records", ex);
-      getMetrics().addFailure(monotonicNow() - start);
-      return 0;
-    }
-
-    // Check the records to remove
-    String znode = getZNodeForClass(clazz);
-    List<T> recordsToRemove = filterMultiple(query, records);
-
-    // Remove the records
-    int removed = 0;
-    for (T existingRecord : recordsToRemove) {
-      LOG.info("Removing \"{}\"", existingRecord);
-      try {
-        String primaryKey = getPrimaryKey(existingRecord);
-        String path = getNodePath(znode, primaryKey);
-        if (zkManager.delete(path)) {
-          removed++;
-        } else {
-          LOG.error("Did not remove \"{}\"", existingRecord);
-        }
-      } catch (Exception e) {
-        LOG.error("Cannot remove \"{}\"", existingRecord, e);
-        getMetrics().addFailure(monotonicNow() - start);
-      }
-    }
-    long end = monotonicNow();
-    if (removed > 0) {
-      getMetrics().addRemove(end - start);
-    }
-    return removed;
-  }
-
-  @Override
-  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
-      throws IOException {
-    long start = monotonicNow();
-    boolean status = true;
-    String znode = getZNodeForClass(clazz);
-    LOG.info("Deleting all children under {}", znode);
-    try {
-      List<String> children = zkManager.getChildren(znode);
-      for (String child : children) {
-        String path = getNodePath(znode, child);
-        LOG.info("Deleting {}", path);
-        zkManager.delete(path);
-      }
-    } catch (Exception e) {
-      LOG.error("Cannot remove {}: {}", znode, e.getMessage());
-      status = false;
-    }
-    long time = monotonicNow() - start;
-    if (status) {
-      getMetrics().addRemove(time);
-    } else {
-      getMetrics().addFailure(time);
-    }
-    return status;
-  }
-
-  private boolean writeNode(
-      String znode, byte[] bytes, boolean update, boolean error) {
-    try {
-      boolean created = zkManager.create(znode);
-      if (!update && !created && error) {
-        LOG.info("Cannot write record \"{}\", it already exists", znode);
-        return false;
-      }
-
-      // Write data
-      zkManager.setData(znode, bytes, -1);
-      return true;
-    } catch (Exception e) {
-      LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
-    }
-    return false;
-  }
-
-  /**
-   * Get the ZNode for a class.
-   *
-   * @param clazz Record class to evaluate.
-   * @return The ZNode for the class.
-   */
-  private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
-    String className = getRecordName(clazz);
-    return getNodePath(baseZNode, className);
-  }
-
-  /**
-   * Creates a record from a string returned by ZooKeeper.
-   *
-   * @param data The data to write.
-   * @param stat Stat of the data record to create.
-   * @param clazz The data record type to create.
-   * @return The created record.
-   * @throws IOException
-   */
-  private <T extends BaseRecord> T createRecord(
-      String data, Stat stat, Class<T> clazz) throws IOException {
-    T record = newRecord(data, clazz, false);
-    record.setDateCreated(stat.getCtime());
-    record.setDateModified(stat.getMtime());
-    return record;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
deleted file mode 100644
index a18433e..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementations of state store data providers/drivers. Each driver is
- * responsible for maintaining, querying, updating and deleting persistent data
- * records. Data records are defined as subclasses of
- * {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord}.
- * Each driver implements the
- * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
- * StateStoreDriver} interface.
- * <p>
- * Currently supported drivers:
- * <ul>
- * <li>{@link StateStoreFileImpl} A file-based data storage backend.
- * <li>{@link StateStoreZooKeeperImpl} A zookeeper based data storage backend.
- * </ul>
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
deleted file mode 100644
index da998b5..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * The state store uses modular data storage classes derived from
- * StateStoreDriver to handle querying, updating and deleting data records. The
- * data storage driver is initialized and maintained by the StateStoreService.
- * The state store supports fetching all records of a type, filtering by column
- * values or fetching a single record by its primary key.
- * <p>
- * Each data storage backend is required to implement the methods contained in
- * the StateStoreDriver interface. These methods allow the querying, updating,
- * inserting and deleting of data records into the state store.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-
-package org.apache.hadoop.hdfs.server.federation.store.driver;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
deleted file mode 100644
index 57b7b61..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.impl;
-
-import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
-import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
-import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
-import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
-import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
-import org.apache.hadoop.hdfs.server.federation.store.records.Query;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of the {@link MembershipStore} State Store API.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MembershipStoreImpl
-    extends MembershipStore implements StateStoreCache {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(MembershipStoreImpl.class);
-
-
-  /** Reported namespaces that are not decommissioned. */
-  private final Set<FederationNamespaceInfo> activeNamespaces;
-
-  /** Namenodes (after evaluating the quorum) that are active in the cluster. 
*/
-  private final Map<String, MembershipState> activeRegistrations;
-  /** Namenode status reports (raw) that were discarded for being too old. */
-  private final Map<String, MembershipState> expiredRegistrations;
-
-  /** Lock to access the local memory cache. */
-  private final ReadWriteLock cacheReadWriteLock =
-      new ReentrantReadWriteLock();
-  private final Lock cacheReadLock = cacheReadWriteLock.readLock();
-  private final Lock cacheWriteLock = cacheReadWriteLock.writeLock();
-
-
-  public MembershipStoreImpl(StateStoreDriver driver) {
-    super(driver);
-
-    this.activeRegistrations = new HashMap<>();
-    this.expiredRegistrations = new HashMap<>();
-    this.activeNamespaces = new TreeSet<>();
-  }
-
-  @Override
-  public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations(
-      GetNamenodeRegistrationsRequest request) throws IOException {
-
-    GetNamenodeRegistrationsResponse response =
-        GetNamenodeRegistrationsResponse.newInstance();
-    cacheReadLock.lock();
-    try {
-      Collection<MembershipState> vals = this.expiredRegistrations.values();
-      List<MembershipState> copyVals = new ArrayList<>(vals);
-      response.setNamenodeMemberships(copyVals);
-    } finally {
-      cacheReadLock.unlock();
-    }
-    return response;
-  }
-
-  @Override
-  public GetNamespaceInfoResponse getNamespaceInfo(
-      GetNamespaceInfoRequest request) throws IOException {
-
-    Set<FederationNamespaceInfo> namespaces = new HashSet<>();
-    try {
-      cacheReadLock.lock();
-      namespaces.addAll(activeNamespaces);
-    } finally {
-      cacheReadLock.unlock();
-    }
-
-    GetNamespaceInfoResponse response =
-        GetNamespaceInfoResponse.newInstance(namespaces);
-    return response;
-  }
-
-  @Override
-  public GetNamenodeRegistrationsResponse getNamenodeRegistrations(
-      final GetNamenodeRegistrationsRequest request) throws IOException {
-
-    // TODO Cache some common queries and sorts
-    List<MembershipState> ret = null;
-
-    cacheReadLock.lock();
-    try {
-      Collection<MembershipState> registrations = activeRegistrations.values();
-      MembershipState partialMembership = request.getPartialMembership();
-      if (partialMembership == null) {
-        ret = new ArrayList<>(registrations);
-      } else {
-        Query<MembershipState> query = new Query<>(partialMembership);
-        ret = filterMultiple(query, registrations);
-      }
-    } finally {
-      cacheReadLock.unlock();
-    }
-    // Sort in ascending update date order
-    Collections.sort(ret);
-
-    GetNamenodeRegistrationsResponse response =
-        GetNamenodeRegistrationsResponse.newInstance(ret);
-    return response;
-  }
-
-  @Override
-  public NamenodeHeartbeatResponse namenodeHeartbeat(
-      NamenodeHeartbeatRequest request) throws IOException {
-
-    MembershipState record = request.getNamenodeMembership();
-    String nnId = record.getNamenodeKey();
-    MembershipState existingEntry = null;
-    cacheReadLock.lock();
-    try {
-      existingEntry = this.activeRegistrations.get(nnId);
-    } finally {
-      cacheReadLock.unlock();
-    }
-
-    if (existingEntry != null) {
-      if (existingEntry.getState() != record.getState()) {
-        LOG.info("NN registration state has changed: {} -> {}",
-            existingEntry, record);
-      } else {
-        LOG.debug("Updating NN registration: {} -> {}", existingEntry, record);
-      }
-    } else {
-      LOG.info("Inserting new NN registration: {}", record);
-    }
-
-    boolean status = getDriver().put(record, true, false);
-
-    NamenodeHeartbeatResponse response =
-        NamenodeHeartbeatResponse.newInstance(status);
-    return response;
-  }
-
-  @Override
-  public boolean loadCache(boolean force) throws IOException {
-    super.loadCache(force);
-
-    // Update local cache atomically
-    cacheWriteLock.lock();
-    try {
-      this.activeRegistrations.clear();
-      this.expiredRegistrations.clear();
-      this.activeNamespaces.clear();
-
-      // Build list of NN registrations: nnId -> registration list
-      Map<String, List<MembershipState>> nnRegistrations = new HashMap<>();
-      List<MembershipState> cachedRecords = getCachedRecords();
-      for (MembershipState membership : cachedRecords) {
-        String nnId = membership.getNamenodeKey();
-        if (membership.getState() == FederationNamenodeServiceState.EXPIRED) {
-          // Expired, RPC service does not use these
-          String key = membership.getPrimaryKey();
-          this.expiredRegistrations.put(key, membership);
-        } else {
-          // This is a valid NN registration, build a list of all registrations
-          // using the NN id to use for the quorum calculation.
-          List<MembershipState> nnRegistration =
-              nnRegistrations.get(nnId);
-          if (nnRegistration == null) {
-            nnRegistration = new LinkedList<>();
-            nnRegistrations.put(nnId, nnRegistration);
-          }
-          nnRegistration.add(membership);
-          String bpId = membership.getBlockPoolId();
-          String cId = membership.getClusterId();
-          String nsId = membership.getNameserviceId();
-          FederationNamespaceInfo nsInfo =
-              new FederationNamespaceInfo(bpId, cId, nsId);
-          this.activeNamespaces.add(nsInfo);
-        }
-      }
-
-      // Calculate most representative entry for each active NN id
-      for (List<MembershipState> nnRegistration : nnRegistrations.values()) {
-        // Run quorum based on NN state
-        MembershipState representativeRecord =
-            getRepresentativeQuorum(nnRegistration);
-        String nnKey = representativeRecord.getNamenodeKey();
-        this.activeRegistrations.put(nnKey, representativeRecord);
-      }
-      LOG.debug("Refreshed {} NN registrations from State Store",
-          cachedRecords.size());
-    } finally {
-      cacheWriteLock.unlock();
-    }
-    return true;
-  }
-
-  @Override
-  public UpdateNamenodeRegistrationResponse updateNamenodeRegistration(
-      UpdateNamenodeRegistrationRequest request) throws IOException {
-
-    boolean status = false;
-    cacheWriteLock.lock();
-    try {
-      String namenode = MembershipState.getNamenodeKey(
-          request.getNameserviceId(), request.getNamenodeId());
-      MembershipState member = this.activeRegistrations.get(namenode);
-      if (member != null) {
-        member.setState(request.getState());
-        status = true;
-      }
-    } finally {
-      cacheWriteLock.unlock();
-    }
-    UpdateNamenodeRegistrationResponse response =
-        UpdateNamenodeRegistrationResponse.newInstance(status);
-    return response;
-  }
-
-  /**
-   * Picks the most recent entry in the subset that is most agreeable on the
-   * specified field. 1) If a majority of the collection has the same value for
-   * the field, the first sorted entry within the subset the matches the
-   * majority value 2) Otherwise the first sorted entry in the set of all
-   * entries
-   *
-   * @param records - Collection of state store record objects of the same type
-   * @return record that is most representative of the field name
-   */
-  private MembershipState getRepresentativeQuorum(
-      Collection<MembershipState> records) {
-
-    // Collate objects by field value: field value -> order set of records
-    Map<FederationNamenodeServiceState, TreeSet<MembershipState>> occurenceMap 
=
-        new HashMap<>();
-    for (MembershipState record : records) {
-      FederationNamenodeServiceState state = record.getState();
-      TreeSet<MembershipState> matchingSet = occurenceMap.get(state);
-      if (matchingSet == null) {
-        // TreeSet orders elements by descending date via comparators
-        matchingSet = new TreeSet<>();
-        occurenceMap.put(state, matchingSet);
-      }
-      matchingSet.add(record);
-    }
-
-    // Select largest group
-    TreeSet<MembershipState> largestSet = new TreeSet<>();
-    for (TreeSet<MembershipState> matchingSet : occurenceMap.values()) {
-      if (largestSet.size() < matchingSet.size()) {
-        largestSet = matchingSet;
-      }
-    }
-
-    // If quorum, use the newest element here
-    if (largestSet.size() > records.size() / 2) {
-      return largestSet.first();
-      // Otherwise, return most recent by class comparator
-    } else if (records.size() > 0) {
-      TreeSet<MembershipState> sortedList = new TreeSet<>(records);
-      LOG.debug("Quorum failed, using most recent: {}", sortedList.first());
-      return sortedList.first();
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
deleted file mode 100644
index eb117d6..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.impl;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
-import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
-import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
-import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
-import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
-import org.apache.hadoop.hdfs.server.federation.store.records.Query;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.util.Time;
-
-/**
- * Implementation of the {@link MountTableStore} state store API.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MountTableStoreImpl extends MountTableStore {
-
-  public MountTableStoreImpl(StateStoreDriver driver) {
-    super(driver);
-  }
-
-  @Override
-  public AddMountTableEntryResponse addMountTableEntry(
-      AddMountTableEntryRequest request) throws IOException {
-    MountTable mountTable = request.getEntry();
-    if (mountTable != null) {
-      RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker();
-      if (pc != null) {
-        pc.checkPermission(mountTable, FsAction.WRITE);
-      }
-    }
-
-    boolean status = getDriver().put(mountTable, false, true);
-    AddMountTableEntryResponse response =
-        AddMountTableEntryResponse.newInstance();
-    response.setStatus(status);
-    return response;
-  }
-
-  @Override
-  public UpdateMountTableEntryResponse updateMountTableEntry(
-      UpdateMountTableEntryRequest request) throws IOException {
-    MountTable mountTable = request.getEntry();
-    if (mountTable != null) {
-      RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker();
-      if (pc != null) {
-        pc.checkPermission(mountTable, FsAction.WRITE);
-      }
-    }
-
-    boolean status = getDriver().put(mountTable, true, true);
-    UpdateMountTableEntryResponse response =
-        UpdateMountTableEntryResponse.newInstance();
-    response.setStatus(status);
-    return response;
-  }
-
-  @Override
-  public RemoveMountTableEntryResponse removeMountTableEntry(
-      RemoveMountTableEntryRequest request) throws IOException {
-    final String srcPath = request.getSrcPath();
-    final MountTable partial = MountTable.newInstance();
-    partial.setSourcePath(srcPath);
-    final Query<MountTable> query = new Query<>(partial);
-    final MountTable deleteEntry = getDriver().get(getRecordClass(), query);
-
-    boolean status = false;
-    if (deleteEntry != null) {
-      RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker();
-      if (pc != null) {
-        pc.checkPermission(deleteEntry, FsAction.WRITE);
-      }
-      status = getDriver().remove(deleteEntry);
-    }
-
-    RemoveMountTableEntryResponse response =
-        RemoveMountTableEntryResponse.newInstance();
-    response.setStatus(status);
-    return response;
-  }
-
-  @Override
-  public GetMountTableEntriesResponse getMountTableEntries(
-      GetMountTableEntriesRequest request) throws IOException {
-    RouterPermissionChecker pc =
-        RouterAdminServer.getPermissionChecker();
-    // Get all values from the cache
-    List<MountTable> records = getCachedRecords();
-
-    // Sort and filter
-    Collections.sort(records, MountTable.SOURCE_COMPARATOR);
-    String reqSrcPath = request.getSrcPath();
-    if (reqSrcPath != null && !reqSrcPath.isEmpty()) {
-      // Return only entries beneath this path
-      Iterator<MountTable> it = records.iterator();
-      while (it.hasNext()) {
-        MountTable record = it.next();
-        String srcPath = record.getSourcePath();
-        if (!srcPath.startsWith(reqSrcPath)) {
-          it.remove();
-        } else if (pc != null) {
-          // do the READ permission check
-          try {
-            pc.checkPermission(record, FsAction.READ);
-          } catch (AccessControlException ignored) {
-            // Remove this mount table entry if it cannot
-            // be accessed by current user.
-            it.remove();
-          }
-        }
-      }
-    }
-
-    GetMountTableEntriesResponse response =
-        GetMountTableEntriesResponse.newInstance();
-    response.setEntries(records);
-    response.setTimestamp(Time.now());
-    return response;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
deleted file mode 100644
index d58c288..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.impl;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
-import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
-import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
-import org.apache.hadoop.hdfs.server.federation.store.records.Query;
-import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
-import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
-
-/**
- * Implementation of the {@link RouterStore} state store API.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class RouterStoreImpl extends RouterStore {
-
-  public RouterStoreImpl(StateStoreDriver driver) {
-    super(driver);
-  }
-
-  @Override
-  public GetRouterRegistrationResponse getRouterRegistration(
-      GetRouterRegistrationRequest request) throws IOException {
-
-    final RouterState partial = RouterState.newInstance();
-    partial.setAddress(request.getRouterId());
-    final Query<RouterState> query = new Query<RouterState>(partial);
-    RouterState record = getDriver().get(getRecordClass(), query);
-    if (record != null) {
-      overrideExpiredRecord(record);
-    }
-    GetRouterRegistrationResponse response =
-        GetRouterRegistrationResponse.newInstance();
-    response.setRouter(record);
-    return response;
-  }
-
-  @Override
-  public GetRouterRegistrationsResponse getRouterRegistrations(
-      GetRouterRegistrationsRequest request) throws IOException {
-
-    // Get all values from the cache
-    QueryResult<RouterState> recordsAndTimeStamp =
-        getCachedRecordsAndTimeStamp();
-    List<RouterState> records = recordsAndTimeStamp.getRecords();
-    long timestamp = recordsAndTimeStamp.getTimestamp();
-
-    // Generate response
-    GetRouterRegistrationsResponse response =
-        GetRouterRegistrationsResponse.newInstance();
-    response.setRouters(records);
-    response.setTimestamp(timestamp);
-    return response;
-  }
-
-  @Override
-  public RouterHeartbeatResponse routerHeartbeat(RouterHeartbeatRequest 
request)
-      throws IOException {
-
-    RouterState record = request.getRouter();
-    boolean status = getDriver().put(record, true, false);
-    RouterHeartbeatResponse response =
-        RouterHeartbeatResponse.newInstance(status);
-    return response;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
deleted file mode 100644
index 1a50d15..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains implementations of the state store API interfaces. All classes
- * derive from {@link
- * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API
- * definitions are contained in the
- * org.apache.hadoop.hdfs.server.federation.store package.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-package org.apache.hadoop.hdfs.server.federation.store.impl;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
deleted file mode 100644
index 949ec7c..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * The federation state store tracks persistent values that are shared between
- * multiple routers.
- * <p>
- * Data is stored in data records that inherit from a common class. Data 
records
- * are serialized when written to the data store using a modular serialization
- * implementation. The default is profobuf serialization. Data is stored as 
rows
- * of records of the same type with each data member in a record representing a
- * column.
- * <p>
- * The state store uses a modular data storage
- * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
- * StateStoreDriver} to handle querying, updating and deleting data records. 
The
- * data storage driver is initialized and maintained by the
- * {@link org.apache.hadoop.hdfs.server.federation.store.
- * StateStoreService FederationStateStoreService}. The state store
- * supports fetching all records of a type, filtering by column values or
- * fetching a single record by its primary key.
- * <p>
- * The state store contains several API interfaces, one for each data records
- * type.
- * <p>
- * <ul>
- * <li>FederationMembershipStateStore: state of all Namenodes in the 
federation.
- * Uses the MembershipState record.
- * <li>FederationMountTableStore: Mount table mapping paths in the global
- * namespace to individual subcluster paths. Uses the MountTable record.
- * <li>RouterStateStore: State of all routers in the federation. Uses the
- * RouterState record.
- * </ul>
- * <p>
- * Each API is defined in a separate interface. The implementations of these
- * interfaces are responsible for accessing the
- * {@link 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
- * StateStoreDriver} to query, update and delete data records.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-
-package org.apache.hadoop.hdfs.server.federation.store;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
deleted file mode 100644
index 2d9f102..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.store.protocol;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import 
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
-import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
-
-/**
- * API request for adding a mount table entry to the state store.
- */
-public abstract class AddMountTableEntryRequest {
-
-  public static AddMountTableEntryRequest newInstance() {
-    return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class);
-  }
-
-  public static AddMountTableEntryRequest newInstance(MountTable newEntry) {
-    AddMountTableEntryRequest request = newInstance();
-    request.setEntry(newEntry);
-    return request;
-  }
-
-  @Public
-  @Unstable
-  public abstract MountTable getEntry();
-
-  @Public
-  @Unstable
-  public abstract void setEntry(MountTable mount);
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to