http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
new file mode 100644
index 0000000..3915c56
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static 
org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver.extractTempFileName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the multiple destination resolver.
+ */
+public class TestMultipleDestinationResolver {
+
+  private MultipleDestinationMountTableResolver resolver;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new Configuration();
+    resolver = new MultipleDestinationMountTableResolver(conf, null);
+
+    // We manually point /tmp to only subcluster0
+    Map<String, String> map1 = new HashMap<>();
+    map1.put("subcluster0", "/tmp");
+    resolver.addEntry(MountTable.newInstance("/tmp", map1));
+
+    // We manually point / to subcluster0,1,2 with default order (hash)
+    Map<String, String> mapDefault = new HashMap<>();
+    mapDefault.put("subcluster0", "/");
+    mapDefault.put("subcluster1", "/");
+    mapDefault.put("subcluster2", "/");
+    MountTable defaultEntry = MountTable.newInstance("/", mapDefault);
+    resolver.addEntry(defaultEntry);
+
+    // We manually point /hash to subcluster0,1,2 with hashing
+    Map<String, String> mapHash = new HashMap<>();
+    mapHash.put("subcluster0", "/hash");
+    mapHash.put("subcluster1", "/hash");
+    mapHash.put("subcluster2", "/hash");
+    MountTable hashEntry = MountTable.newInstance("/hash", mapHash);
+    hashEntry.setDestOrder(DestinationOrder.HASH);
+    resolver.addEntry(hashEntry);
+
+    // We manually point /hashall to subcluster0,1,2 with hashing (full tree)
+    Map<String, String> mapHashAll = new HashMap<>();
+    mapHashAll.put("subcluster0", "/hashall");
+    mapHashAll.put("subcluster1", "/hashall");
+    mapHashAll.put("subcluster2", "/hashall");
+    MountTable hashEntryAll = MountTable.newInstance("/hashall", mapHashAll);
+    hashEntryAll.setDestOrder(DestinationOrder.HASH_ALL);
+    resolver.addEntry(hashEntryAll);
+
+    // We point /local to subclusters 0, 1, 2 with the local order
+    Map<String, String> mapLocal = new HashMap<>();
+    mapLocal.put("subcluster0", "/local");
+    mapLocal.put("subcluster1", "/local");
+    mapLocal.put("subcluster2", "/local");
+    MountTable localEntry = MountTable.newInstance("/local", mapLocal);
+    localEntry.setDestOrder(DestinationOrder.LOCAL);
+    resolver.addEntry(localEntry);
+
+    // We point /random to subclusters 0, 1, 2 with the random order
+    Map<String, String> mapRandom = new HashMap<>();
+    mapRandom.put("subcluster0", "/random");
+    mapRandom.put("subcluster1", "/random");
+    mapRandom.put("subcluster2", "/random");
+    MountTable randomEntry = MountTable.newInstance("/random", mapRandom);
+    randomEntry.setDestOrder(DestinationOrder.RANDOM);
+    resolver.addEntry(randomEntry);
+
+    // Read only mount point
+    Map<String, String> mapReadOnly = new HashMap<>();
+    mapReadOnly.put("subcluster0", "/readonly");
+    mapReadOnly.put("subcluster1", "/readonly");
+    mapReadOnly.put("subcluster2", "/readonly");
+    MountTable readOnlyEntry = MountTable.newInstance("/readonly", 
mapReadOnly);
+    readOnlyEntry.setReadOnly(true);
+    resolver.addEntry(readOnlyEntry);
+  }
+
+  @Test
+  public void testHashEqualDistribution() throws IOException {
+    // First level
+    testEvenDistribution("/hash");
+    testEvenDistribution("/hash/folder0", false);
+
+    // All levels
+    testEvenDistribution("/hashall");
+    testEvenDistribution("/hashall/folder0");
+  }
+
+  @Test
+  public void testHashAll() throws IOException {
+    // Files should be spread across subclusters
+    PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
+    assertDest("subcluster0", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
+    assertDest("subcluster1", dest1);
+
+    // Files within folder should be spread across subclusters
+    PathLocation dest2 = resolver.getDestinationForPath("/hashall/folder0");
+    assertDest("subcluster2", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/hashall/folder0/file0.txt");
+    assertDest("subcluster1", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/hashall/folder0/file1.txt");
+    assertDest("subcluster0", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file0.txt");
+    assertDest("subcluster1", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file1.txt");
+    assertDest("subcluster1", dest6);
+    PathLocation dest7 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file2.txt");
+    assertDest("subcluster0", dest7);
+
+    PathLocation dest8 = resolver.getDestinationForPath("/hashall/folder1");
+    assertDest("subcluster1", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/hashall/folder1/file0.txt");
+    assertDest("subcluster0", dest9);
+    PathLocation dest10 = resolver.getDestinationForPath(
+        "/hashall/folder1/file1.txt");
+    assertDest("subcluster1", dest10);
+
+    PathLocation dest11 = resolver.getDestinationForPath("/hashall/folder2");
+    assertDest("subcluster2", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/hashall/folder2/file0.txt");
+    assertDest("subcluster0", dest12);
+    PathLocation dest13 = resolver.getDestinationForPath(
+        "/hashall/folder2/file1.txt");
+    assertDest("subcluster0", dest13);
+    PathLocation dest14 = resolver.getDestinationForPath(
+        "/hashall/folder2/file2.txt");
+    assertDest("subcluster1", dest14);
+  }
+
+  @Test
+  public void testHashFirst() throws IOException {
+    PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
+    assertDest("subcluster0", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
+    assertDest("subcluster1", dest1);
+
+    // All these must be in the same location: subcluster0
+    PathLocation dest2 = resolver.getDestinationForPath("/hash/folder0");
+    assertDest("subcluster0", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/hash/folder0/file0.txt");
+    assertDest("subcluster0", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/hash/folder0/file1.txt");
+    assertDest("subcluster0", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/hash/folder0/folder0/file0.txt");
+    assertDest("subcluster0", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/hash/folder0/folder0/file1.txt");
+    assertDest("subcluster0", dest6);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest7 = resolver.getDestinationForPath("/hash/folder1");
+    assertDest("subcluster2", dest7);
+    PathLocation dest8 = resolver.getDestinationForPath(
+        "/hash/folder1/file0.txt");
+    assertDest("subcluster2", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/hash/folder1/file1.txt");
+    assertDest("subcluster2", dest9);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest10 = resolver.getDestinationForPath("/hash/folder2");
+    assertDest("subcluster2", dest10);
+    PathLocation dest11 = resolver.getDestinationForPath(
+        "/hash/folder2/file0.txt");
+    assertDest("subcluster2", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/hash/folder2/file1.txt");
+    assertDest("subcluster2", dest12);
+  }
+
+  @Test
+  public void testRandomEqualDistribution() throws IOException {
+    testEvenDistribution("/random");
+  }
+
+  @Test
+  public void testSingleDestination() throws IOException {
+    // All the files in /tmp should be in subcluster0
+    for (int f = 0; f < 100; f++) {
+      String filename = "/tmp/b/c/file" + f + ".txt";
+      PathLocation destination = resolver.getDestinationForPath(filename);
+      RemoteLocation loc = destination.getDefaultLocation();
+      assertEquals("subcluster0", loc.getNameserviceId());
+      assertEquals(filename, loc.getDest());
+    }
+  }
+
+  @Test
+  public void testResolveSubdirectories() throws Exception {
+    // Simulate a testdir under a multi-destination mount.
+    Random r = new Random();
+    String testDir = "/sort/testdir" + r.nextInt();
+    String file1 = testDir + "/file1" + r.nextInt();
+    String file2 = testDir + "/file2" + r.nextInt();
+
+    // Verify both files resolve to the same namespace as the parent dir.
+    PathLocation testDirLocation = resolver.getDestinationForPath(testDir);
+    RemoteLocation defaultLoc = testDirLocation.getDefaultLocation();
+    String testDirNamespace = defaultLoc.getNameserviceId();
+
+    PathLocation file1Location = resolver.getDestinationForPath(file1);
+    RemoteLocation defaultLoc1 = file1Location.getDefaultLocation();
+    assertEquals(testDirNamespace, defaultLoc1.getNameserviceId());
+
+    PathLocation file2Location = resolver.getDestinationForPath(file2);
+    RemoteLocation defaultLoc2 = file2Location.getDefaultLocation();
+    assertEquals(testDirNamespace, defaultLoc2.getNameserviceId());
+  }
+
+  @Test
+  public void testExtractTempFileName() {
+    for (String teststring : new String[] {
+        "testfile1.txt.COPYING",
+        "testfile1.txt._COPYING_",
+        "testfile1.txt._COPYING_.attempt_1486662804109_0055_m_000042_0",
+        "testfile1.txt.tmp",
+        "_temp/testfile1.txt",
+        "_temporary/testfile1.txt.af77e2ab-4bc5-4959-ae08-299c880ee6b8",
+        "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
+          "testfile1.txt" }) {
+      String finalName = extractTempFileName(teststring);
+      assertEquals("testfile1.txt", finalName);
+    }
+
+    // False cases
+    assertEquals(
+        "file1.txt.COPYING1", extractTempFileName("file1.txt.COPYING1"));
+    assertEquals("file1.txt.tmp2", extractTempFileName("file1.txt.tmp2"));
+
+    // Speculation patterns
+    String finalName = extractTempFileName(
+        "_temporary/part-00007.af77e2ab-4bc5-4959-ae08-299c880ee6b8");
+    assertEquals("part-00007", finalName);
+    finalName = extractTempFileName(
+        "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
+          "part-00003");
+    assertEquals("part-00003", finalName);
+
+    // Subfolders
+    finalName = extractTempFileName("folder0/testfile1.txt._COPYING_");
+    assertEquals("folder0/testfile1.txt", finalName);
+    finalName = extractTempFileName(
+        "folder0/folder1/testfile1.txt._COPYING_");
+    assertEquals("folder0/folder1/testfile1.txt", finalName);
+    finalName = extractTempFileName(
+        "processedHrsData.txt/_temporary/0/_temporary/" +
+        "attempt_201706281636_0007_m_000003_46/part-00003");
+    assertEquals("processedHrsData.txt/part-00003", finalName);
+  }
+
+  @Test
+  public void testReadOnly() throws IOException {
+    MountTable mount = resolver.getMountPoint("/readonly");
+    assertTrue(mount.isReadOnly());
+
+    PathLocation dest0 = resolver.getDestinationForPath("/readonly/file0.txt");
+    assertDest("subcluster1", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/readonly/file1.txt");
+    assertDest("subcluster2", dest1);
+
+    // All these must be in the same location: subcluster0
+    PathLocation dest2 = resolver.getDestinationForPath("/readonly/folder0");
+    assertDest("subcluster1", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/readonly/folder0/file0.txt");
+    assertDest("subcluster1", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/readonly/folder0/file1.txt");
+    assertDest("subcluster1", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/readonly/folder0/folder0/file0.txt");
+    assertDest("subcluster1", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/readonly/folder0/folder0/file1.txt");
+    assertDest("subcluster1", dest6);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest7 = resolver.getDestinationForPath("/readonly/folder1");
+    assertDest("subcluster2", dest7);
+    PathLocation dest8 = resolver.getDestinationForPath(
+        "/readonly/folder1/file0.txt");
+    assertDest("subcluster2", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/readonly/folder1/file1.txt");
+    assertDest("subcluster2", dest9);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest10 = resolver.getDestinationForPath("/readonly/folder2");
+    assertDest("subcluster1", dest10);
+    PathLocation dest11 = resolver.getDestinationForPath(
+        "/readonly/folder2/file0.txt");
+    assertDest("subcluster1", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/readonly/folder2/file1.txt");
+    assertDest("subcluster1", dest12);
+  }
+
+  @Test
+  public void testLocalResolver() throws IOException {
+    PathLocation dest0 =
+        resolver.getDestinationForPath("/local/folder0/file0.txt");
+    assertDest("subcluster0", dest0);
+  }
+
+  @Test
+  public void testRandomResolver() throws IOException {
+    Set<String> destinations = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      PathLocation dest =
+          resolver.getDestinationForPath("/random/folder0/file0.txt");
+      RemoteLocation firstDest = dest.getDestinations().get(0);
+      String nsId = firstDest.getNameserviceId();
+      destinations.add(nsId);
+    }
+    assertEquals(3, destinations.size());
+  }
+
+  /**
+   * Test that a path has files distributed across destinations evenly.
+   * @param path Path to check.
+   * @throws IOException
+   */
+  private void testEvenDistribution(final String path) throws IOException {
+    testEvenDistribution(path, true);
+  }
+
+  /**
+   * Test that a path has files distributed across destinations evenly or not.
+   * @param path Path to check.
+   * @param even If the distribution should be even or not.
+   * @throws IOException If it cannot check it.
+   */
+  private void testEvenDistribution(final String path, final boolean even)
+      throws IOException {
+
+    // Subcluster -> Files
+    Map<String, Set<String>> results = new HashMap<>();
+    for (int f = 0; f < 10000; f++) {
+      String filename = path + "/file" + f + ".txt";
+      PathLocation destination = resolver.getDestinationForPath(filename);
+      RemoteLocation loc = destination.getDefaultLocation();
+      assertEquals(filename, loc.getDest());
+
+      String nsId = loc.getNameserviceId();
+      if (!results.containsKey(nsId)) {
+        results.put(nsId, new TreeSet<>());
+      }
+      results.get(nsId).add(filename);
+    }
+
+    if (!even) {
+      // All files should be in one subcluster
+      assertEquals(1, results.size());
+    } else {
+      // Files should be distributed somewhat evenly
+      assertEquals(3, results.size());
+      int count = 0;
+      for (Set<String> files : results.values()) {
+        count = count + files.size();
+      }
+      int avg = count / results.keySet().size();
+      for (Set<String> files : results.values()) {
+        int filesCount = files.size();
+        // Check that the count in each namespace is within 20% of avg
+        assertTrue(filesCount > 0);
+        assertTrue(Math.abs(filesCount - avg) < (avg / 5));
+      }
+    }
+  }
+
+  private static void assertDest(String expectedDest, PathLocation loc) {
+    assertEquals(expectedDest, 
loc.getDestinations().get(0).getNameserviceId());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
new file mode 100644
index 0000000..00c2c13
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link ActiveNamenodeResolver} functionality.
+ */
+public class TestNamenodeResolver {
+
+  private static StateStoreService stateStore;
+  private static ActiveNamenodeResolver namenodeResolver;
+
+  @BeforeClass
+  public static void create() throws Exception {
+
+    Configuration conf = getStateStoreConfiguration();
+
+    // Reduce expirations to 5 seconds
+    conf.setLong(
+        RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+        TimeUnit.SECONDS.toMillis(5));
+
+    stateStore = newStateStore(conf);
+    assertNotNull(stateStore);
+
+    namenodeResolver = new MembershipNamenodeResolver(conf, stateStore);
+    namenodeResolver.setRouterId(ROUTERS[0]);
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    stateStore.stop();
+    stateStore.close();
+  }
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    // Wait for state store to connect
+    stateStore.loadDriver();
+    waitStateStore(stateStore, 10000);
+
+    // Clear NN registrations
+    boolean cleared = clearRecords(stateStore, MembershipState.class);
+    assertTrue(cleared);
+  }
+
+  @Test
+  public void testStateStoreDisconnected() throws Exception {
+
+    // Add an entry to the store
+    NamenodeStatusReport report = createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+    assertTrue(namenodeResolver.registerNamenode(report));
+
+    // Close the data store driver
+    stateStore.closeDriver();
+    assertFalse(stateStore.isDriverReady());
+
+    // Flush the caches
+    stateStore.refreshCaches(true);
+
+    // Verify commands fail due to no cached data and no state store
+    // connectivity.
+    List<? extends FederationNamenodeContext> nns =
+        namenodeResolver.getNamenodesForBlockPoolId(NAMESERVICES[0]);
+    assertNull(nns);
+
+    verifyException(namenodeResolver, "registerNamenode",
+        StateStoreUnavailableException.class,
+        new Class[] {NamenodeStatusReport.class}, new Object[] {report});
+  }
+
+  /**
+   * Verify the first registration on the resolver.
+   *
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier within the nemeservice.
+   * @param resultsCount Number of results expected.
+   * @param state Expected state for the first one.
+   * @throws IOException If we cannot get the namenodes.
+   */
+  private void verifyFirstRegistration(String nsId, String nnId,
+      int resultsCount, FederationNamenodeServiceState state)
+          throws IOException {
+    List<? extends FederationNamenodeContext> namenodes =
+        namenodeResolver.getNamenodesForNameserviceId(nsId);
+    if (resultsCount == 0) {
+      assertNull(namenodes);
+    } else {
+      assertEquals(resultsCount, namenodes.size());
+      if (namenodes.size() > 0) {
+        FederationNamenodeContext namenode = namenodes.get(0);
+        assertEquals(state, namenode.getState());
+        assertEquals(nnId, namenode.getNamenodeId());
+      }
+    }
+  }
+
+  @Test
+  public void testRegistrationExpired()
+      throws InterruptedException, IOException {
+
+    // Populate the state store with a single NN element
+    // 1) ns0:nn0 - Active
+    // Wait for the entry to expire without heartbeating
+    // Verify the NN entry is not accessible once expired.
+    NamenodeStatusReport report = createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+    assertTrue(namenodeResolver.registerNamenode(report));
+
+    // Load cache
+    stateStore.refreshCaches(true);
+
+    // Verify
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 1,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // Wait past expiration (set in conf to 5 seconds)
+    Thread.sleep(6000);
+    // Reload cache
+    stateStore.refreshCaches(true);
+
+    // Verify entry is now expired and is no longer in the cache
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 0,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // Heartbeat again, updates dateModified
+    assertTrue(namenodeResolver.registerNamenode(report));
+    // Reload cache
+    stateStore.refreshCaches(true);
+
+    // Verify updated entry is marked active again and accessible to RPC server
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 1,
+        FederationNamenodeServiceState.ACTIVE);
+  }
+
+  @Test
+  public void testRegistrationNamenodeSelection()
+      throws InterruptedException, IOException {
+
+    // 1) ns0:nn0 - Active
+    // 2) ns0:nn1 - Standby (newest)
+    // Verify the selected entry is the active entry
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(
+            NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(
+            NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+    stateStore.refreshCaches(true);
+
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 2,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // 1) ns0:nn0 - Expired (stale)
+    // 2) ns0:nn1 - Standby (newest)
+    // Verify the selected entry is the standby entry as the active entry is
+    // stale
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(
+            NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+
+    // Expire active registration
+    Thread.sleep(6000);
+
+    // Refresh standby registration
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+    // Verify that standby is selected (active is now expired)
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 1,
+        FederationNamenodeServiceState.STANDBY);
+
+    // 1) ns0:nn0 - Active
+    // 2) ns0:nn1 - Unavailable (newest)
+    // Verify the selected entry is the active entry
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], null)));
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[0], 2,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // 1) ns0:nn0 - Unavailable (newest)
+    // 2) ns0:nn1 - Standby
+    // Verify the selected entry is the standby entry
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+    Thread.sleep(1000);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], null)));
+
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 2,
+        FederationNamenodeServiceState.STANDBY);
+
+    // 1) ns0:nn0 - Active (oldest)
+    // 2) ns0:nn1 - Standby
+    // 3) ns0:nn2 - Active (newest)
+    // Verify the selected entry is the newest active entry
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(NAMESERVICES[0], NAMENODES[0], null)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[2], HAServiceState.ACTIVE)));
+
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[2], 3,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // 1) ns0:nn0 - Standby (oldest)
+    // 2) ns0:nn1 - Standby (newest)
+    // 3) ns0:nn2 - Standby
+    // Verify the selected entry is the newest standby entry
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.STANDBY)));
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[2], HAServiceState.STANDBY)));
+    Thread.sleep(1500);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3,
+        FederationNamenodeServiceState.STANDBY);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
new file mode 100644
index 0000000..42ede62
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the {@link LocalResolver}.
+ */
+public class TestLocalResolver {
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLocalResolver() throws IOException {
+
+    // Mock the subcluster mapping
+    Configuration conf = new Configuration();
+    Router router = mock(Router.class);
+    StateStoreService stateStore = mock(StateStoreService.class);
+    MembershipStore membership = mock(MembershipStore.class);
+    when(router.getStateStore()).thenReturn(stateStore);
+    when(stateStore.getRegisteredRecordStore(any(Class.class)))
+        .thenReturn(membership);
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance();
+    // Set the mapping for each client
+    List<MembershipState> records = new LinkedList<>();
+    records.add(newMembershipState("client0", "subcluster0"));
+    records.add(newMembershipState("client1", "subcluster1"));
+    records.add(newMembershipState("client2", "subcluster2"));
+    response.setNamenodeMemberships(records);
+    when(membership.getNamenodeRegistrations(
+        any(GetNamenodeRegistrationsRequest.class))).thenReturn(response);
+
+    // Mock the client resolution: it will be anything in sb
+    StringBuilder sb = new StringBuilder("clientX");
+    LocalResolver localResolver = new LocalResolver(conf, router);
+    LocalResolver spyLocalResolver = spy(localResolver);
+    doAnswer(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocation) throws Throwable {
+        return sb.toString();
+      }
+    }).when(spyLocalResolver).getClientAddr();
+
+    // Add the mocks to the resolver
+    MultipleDestinationMountTableResolver resolver =
+        new MultipleDestinationMountTableResolver(conf, router);
+    resolver.addResolver(DestinationOrder.LOCAL, spyLocalResolver);
+
+
+    // We point /local to subclusters 0, 1, 2 with the local order
+    Map<String, String> mapLocal = new HashMap<>();
+    mapLocal.put("subcluster0", "/local");
+    mapLocal.put("subcluster1", "/local");
+    mapLocal.put("subcluster2", "/local");
+    MountTable localEntry = MountTable.newInstance("/local", mapLocal);
+    localEntry.setDestOrder(DestinationOrder.LOCAL);
+    resolver.addEntry(localEntry);
+
+    // Test first with the default destination
+    PathLocation dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster0", dest);
+
+    // We change the client location and verify
+    setClient(sb, "client2");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster2", dest);
+
+    setClient(sb, "client1");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster1", dest);
+
+    setClient(sb, "client0");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster0", dest);
+  }
+
+  private void assertDestination(String expectedNsId, PathLocation loc) {
+    List<RemoteLocation> dests = loc.getDestinations();
+    RemoteLocation dest = dests.get(0);
+    assertEquals(expectedNsId, dest.getNameserviceId());
+  }
+
+  private MembershipState newMembershipState(String addr, String nsId) {
+    return MembershipState.newInstance(
+        "routerId", nsId, "nn0", "cluster0", "blockPool0",
+        addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004",
+        FederationNamenodeServiceState.ACTIVE, false);
+  }
+
+  /**
+   * Set the address of the client issuing the request. We use a StringBuilder
+   * to modify the value in place for the mock.
+   * @param sb StringBuilder to set the client string.
+   * @param client Address of the client.
+   */
+  private static void setClient(StringBuilder sb, String client) {
+    sb.replace(0, sb.length(), client);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
new file mode 100644
index 0000000..741d1f6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test functionalities of {@link ConnectionManager}, which manages a pool
+ * of connections to NameNodes.
+ */
+public class TestConnectionManager {
+  private Configuration conf;
+  private ConnectionManager connManager;
+  private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"};
+  private static final UserGroupInformation TEST_USER1 =
+      UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
+  private static final UserGroupInformation TEST_USER2 =
+      UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
+  private static final UserGroupInformation TEST_USER3 =
+      UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
+  private static final String TEST_NN_ADDRESS = "nn1:8080";
+
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    connManager = new ConnectionManager(conf);
+    NetUtils.addStaticResolution("nn1", "localhost");
+    NetUtils.createSocketAddrForHost("nn1", 8080);
+    connManager.start();
+  }
+
+  @After
+  public void shutdown() {
+    if (connManager != null) {
+      connManager.close();
+    }
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+
+    ConnectionPool pool1 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+    addConnectionsToPool(pool1, 9, 4);
+    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1);
+
+    ConnectionPool pool2 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10);
+    addConnectionsToPool(pool2, 10, 10);
+    poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2);
+
+    checkPoolConnections(TEST_USER1, 9, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Clean up first pool, one connection should be removed, and second pool
+    // should remain the same.
+    connManager.cleanup(pool1);
+    checkPoolConnections(TEST_USER1, 8, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Clean up the first pool again, it should have no effect since it reached
+    // the MIN_ACTIVE_RATIO.
+    connManager.cleanup(pool1);
+    checkPoolConnections(TEST_USER1, 8, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Make sure the number of connections doesn't go below minSize
+    ConnectionPool pool3 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10);
+    addConnectionsToPool(pool3, 10, 0);
+    poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool3);
+    connManager.cleanup(pool3);
+    checkPoolConnections(TEST_USER3, 2, 0);
+    // With active connections added to pool, make sure it honors the
+    // MIN_ACTIVE_RATIO again
+    addConnectionsToPool(pool3, 10, 2);
+    connManager.cleanup(pool3);
+    checkPoolConnections(TEST_USER3, 4, 2);
+  }
+
+  @Test
+  public void testGetConnection() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+    final int totalConns = 10;
+    int activeConns = 5;
+
+    ConnectionPool pool = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+    addConnectionsToPool(pool, totalConns, activeConns);
+    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool);
+
+    // All remaining connections should be usable
+    final int remainingSlots = totalConns - activeConns;
+    for (int i = 0; i < remainingSlots; i++) {
+      ConnectionContext cc = pool.getConnection();
+      assertTrue(cc.isUsable());
+      cc.getClient();
+      activeConns++;
+    }
+
+    checkPoolConnections(TEST_USER1, totalConns, activeConns);
+
+    // Ask for more and this returns an active connection
+    ConnectionContext cc = pool.getConnection();
+    assertTrue(cc.isActive());
+  }
+
+  private void addConnectionsToPool(ConnectionPool pool, int numTotalConn,
+      int numActiveConn) throws IOException {
+    for (int i = 0; i < numTotalConn; i++) {
+      ConnectionContext cc = pool.newConnection();
+      pool.addConnection(cc);
+      if (i < numActiveConn) {
+        cc.getClient();
+      }
+    }
+  }
+
+  private void checkPoolConnections(UserGroupInformation ugi,
+      int numOfConns, int numOfActiveConns) {
+    for (Map.Entry<ConnectionPoolId, ConnectionPool> e :
+        connManager.getPools().entrySet()) {
+      if (e.getKey().getUgi() == ugi) {
+        assertEquals(numOfConns, e.getValue().getNumConnections());
+        assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
new file mode 100644
index 0000000..877fb02
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import 
org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test the service that heartbeats the state of the namenodes to the State
+ * Store.
+ */
+public class TestNamenodeHeartbeat {
+
+  private static RouterDFSCluster cluster;
+  private static ActiveNamenodeResolver namenodeResolver;
+  private static List<NamenodeHeartbeatService> services;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+
+    cluster = new RouterDFSCluster(true, 2);
+
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Mock locator that records the heartbeats
+    List<String> nss = cluster.getNameservices();
+    String ns = nss.get(0);
+    Configuration conf = cluster.generateNamenodeConfiguration(ns);
+    namenodeResolver = new MockResolver(conf);
+    namenodeResolver.setRouterId("testrouter");
+
+    // Create one heartbeat service per NN
+    services = new ArrayList<>();
+    for (NamenodeContext nn : cluster.getNamenodes()) {
+      String nsId = nn.getNameserviceId();
+      String nnId = nn.getNamenodeId();
+      NamenodeHeartbeatService service = new NamenodeHeartbeatService(
+          namenodeResolver, nsId, nnId);
+      service.init(conf);
+      service.start();
+      services.add(service);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    cluster.shutdown();
+    for (NamenodeHeartbeatService service: services) {
+      service.stop();
+      service.close();
+    }
+  }
+
+  @Test
+  public void testNamenodeHeartbeatService() throws IOException {
+
+    RouterDFSCluster testCluster = new RouterDFSCluster(true, 1);
+    Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration(
+        NAMESERVICES[0]);
+    NamenodeHeartbeatService server = new NamenodeHeartbeatService(
+        namenodeResolver, NAMESERVICES[0], NAMENODES[0]);
+    server.init(heartbeatConfig);
+    assertEquals(STATE.INITED, server.getServiceState());
+    server.start();
+    assertEquals(STATE.STARTED, server.getServiceState());
+    server.stop();
+    assertEquals(STATE.STOPPED, server.getServiceState());
+    server.close();
+  }
+
+  @Test
+  public void testHearbeat() throws InterruptedException, IOException {
+
+    // Set NAMENODE1 to active for all nameservices
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+
+    // Wait for heartbeats to record
+    Thread.sleep(5000);
+
+    // Verify the locator has matching NN entries for each NS
+    for (String ns : cluster.getNameservices()) {
+      List<? extends FederationNamenodeContext> nns =
+          namenodeResolver.getNamenodesForNameserviceId(ns);
+
+      // Active
+      FederationNamenodeContext active = nns.get(0);
+      assertEquals(NAMENODES[0], active.getNamenodeId());
+
+      // Standby
+      FederationNamenodeContext standby = nns.get(1);
+      assertEquals(NAMENODES[1], standby.getNamenodeId());
+    }
+
+    // Switch active NNs in 1/2 nameservices
+    List<String> nss = cluster.getNameservices();
+    String failoverNS = nss.get(0);
+    String normalNs = nss.get(1);
+
+    cluster.switchToStandby(failoverNS, NAMENODES[0]);
+    cluster.switchToActive(failoverNS, NAMENODES[1]);
+
+    // Wait for heartbeats to record
+    Thread.sleep(5000);
+
+    // Verify the locator has recorded the failover for the failover NS
+    List<? extends FederationNamenodeContext> failoverNSs =
+        namenodeResolver.getNamenodesForNameserviceId(failoverNS);
+    // Active
+    FederationNamenodeContext active = failoverNSs.get(0);
+    assertEquals(NAMENODES[1], active.getNamenodeId());
+
+    // Standby
+    FederationNamenodeContext standby = failoverNSs.get(1);
+    assertEquals(NAMENODES[0], standby.getNamenodeId());
+
+    // Verify the locator has the same records for the other ns
+    List<? extends FederationNamenodeContext> normalNss =
+        namenodeResolver.getNamenodesForNameserviceId(normalNs);
+    // Active
+    active = normalNss.get(0);
+    assertEquals(NAMENODES[0], active.getNamenodeId());
+    // Standby
+    standby = normalNss.get(1);
+    assertEquals(NAMENODES[1], standby.getNamenodeId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
new file mode 100644
index 0000000..e130b7b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.TestConfigurationFieldsBase;
+
+import java.util.HashSet;
+
+/**
+ * Unit test class to compare the following RBF configuration class:
+ * <p></p>
+ * {@link RBFConfigKeys}
+ * <p></p>
+ * against hdfs-rbf-default.xml for missing properties.
+ * <p></p>
+ * Refer to {@link org.apache.hadoop.conf.TestConfigurationFieldsBase}
+ * for how this class works.
+ */
+public class TestRBFConfigFields extends TestConfigurationFieldsBase {
+  @Override
+  public void initializeMemberVariables() {
+    xmlFilename = "hdfs-rbf-default.xml";
+    configurationClasses = new Class[] {RBFConfigKeys.class};
+
+    // Set error modes
+    errorIfMissingConfigProps = true;
+    errorIfMissingXmlProps = true;
+
+    // Initialize used variables
+    configurationPropsToSkipCompare = new HashSet<String>();
+
+    // Allocate
+    xmlPropsToSkipCompare = new HashSet<String>();
+    xmlPrefixToSkipCompare = new HashSet<String>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
new file mode 100644
index 0000000..39398f7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The the safe mode for the {@link Router} controlled by
+ * {@link SafeModeTimer}.
+ */
+public class TestRouter {
+
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    // Basic configuration without the state store
+    conf = new Configuration();
+    // 1 sec cache refresh
+    conf.setInt(RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1);
+    // Mock resolver classes
+    conf.setClass(RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, ActiveNamenodeResolver.class);
+    conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, FileSubclusterResolver.class);
+
+    // Bind to any available port
+    conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0");
+
+    // Simulate a co-located NN
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + "ns0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + "ns0",
+            "127.0.0.1:0" + 0);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + "ns0",
+            "127.0.0.1:" + 0);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + "ns0",
+            "0.0.0.0");
+  }
+
+  @AfterClass
+  public static void destroy() {
+  }
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+  }
+
+  @After
+  public void cleanup() {
+  }
+
+  private static void testRouterStartup(Configuration routerConfig)
+      throws InterruptedException, IOException {
+    Router router = new Router();
+    assertEquals(STATE.NOTINITED, router.getServiceState());
+    router.init(routerConfig);
+    assertEquals(STATE.INITED, router.getServiceState());
+    router.start();
+    assertEquals(STATE.STARTED, router.getServiceState());
+    router.stop();
+    assertEquals(STATE.STOPPED, router.getServiceState());
+    router.close();
+  }
+
+  @Test
+  public void testRouterService() throws InterruptedException, IOException {
+
+    // Admin only
+    testRouterStartup(new RouterConfigBuilder(conf).admin().build());
+
+    // Http only
+    testRouterStartup(new RouterConfigBuilder(conf).http().build());
+
+    // Rpc only
+    testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
+
+    // Metrics only
+    testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
+
+    // Statestore only
+    testRouterStartup(new RouterConfigBuilder(conf).stateStore().build());
+
+    // Heartbeat only
+    testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
+
+    // Run with all services
+    testRouterStartup(new RouterConfigBuilder(conf).all().build());
+  }
+
+  @Test
+  public void testRouterRestartRpcService() throws IOException {
+
+    // Start
+    Router router = new Router();
+    router.init(new RouterConfigBuilder(conf).rpc().build());
+    router.start();
+
+    // Verify RPC server is running
+    assertNotNull(router.getRpcServerAddress());
+    RouterRpcServer rpcServer = router.getRpcServer();
+    assertNotNull(rpcServer);
+    assertEquals(STATE.STARTED, rpcServer.getServiceState());
+
+    // Stop router and RPC server
+    router.stop();
+    assertEquals(STATE.STOPPED, rpcServer.getServiceState());
+
+    router.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
new file mode 100644
index 0000000..a8ffded
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The administrator interface of the {@link Router} implemented by
+ * {@link RouterAdminServer}.
+ */
+public class TestRouterAdmin {
+
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  public static final String RPC_BEAN =
+      "Hadoop:service=Router,name=FederationRPC";
+  private static List<MountTable> mockMountTable;
+  private static StateStoreService stateStore;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new StateStoreDFSCluster(false, 1);
+    // Build and start a router with State Store + admin + RPC
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    cluster.addRouterOverrides(conf);
+    cluster.startRouters();
+    routerContext = cluster.getRandomRouter();
+    mockMountTable = cluster.generateMockMountTable();
+    Router router = routerContext.getRouter();
+    stateStore = router.getStateStore();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.stopRouter(routerContext);
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+    assertTrue(
+        synchronizeRecords(stateStore, mockMountTable, MountTable.class));
+  }
+
+  @Test
+  public void testAddMountTable() throws IOException {
+    MountTable newEntry = MountTable.newInstance(
+        "/testpath", Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> records = getMountTableEntries(mountTable);
+    assertEquals(records.size(), mockMountTable.size());
+
+    // Add
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // New mount table size
+    List<MountTable> records2 = getMountTableEntries(mountTable);
+    assertEquals(records2.size(), mockMountTable.size() + 1);
+  }
+
+  @Test
+  public void testAddDuplicateMountTable() throws IOException {
+    MountTable newEntry = MountTable.newInstance("/testpath",
+        Collections.singletonMap("ns0", "/testdir"), Time.now(), Time.now());
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> entries1 = getMountTableEntries(mountTable);
+    assertEquals(entries1.size(), mockMountTable.size());
+
+    // Add
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // New mount table size
+    List<MountTable> entries2 = getMountTableEntries(mountTable);
+    assertEquals(entries2.size(), mockMountTable.size() + 1);
+
+    // Add again, should fail
+    AddMountTableEntryResponse addResponse2 =
+        mountTable.addMountTableEntry(addRequest);
+    assertFalse(addResponse2.getStatus());
+  }
+
+  @Test
+  public void testAddReadOnlyMountTable() throws IOException {
+    MountTable newEntry = MountTable.newInstance(
+        "/readonly", Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+    newEntry.setReadOnly(true);
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> records = getMountTableEntries(mountTable);
+    assertEquals(records.size(), mockMountTable.size());
+
+    // Add
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // New mount table size
+    List<MountTable> records2 = getMountTableEntries(mountTable);
+    assertEquals(records2.size(), mockMountTable.size() + 1);
+
+    // Check that we have the read only entry
+    MountTable record = getMountTableEntry("/readonly");
+    assertEquals("/readonly", record.getSourcePath());
+    assertTrue(record.isReadOnly());
+
+    // Removing the new entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance("/readonly");
+    RemoveMountTableEntryResponse removeResponse =
+        mountTable.removeMountTableEntry(removeRequest);
+    assertTrue(removeResponse.getStatus());
+  }
+
+  @Test
+  public void testAddOrderMountTable() throws IOException {
+    testAddOrderMountTable(DestinationOrder.HASH);
+    testAddOrderMountTable(DestinationOrder.LOCAL);
+    testAddOrderMountTable(DestinationOrder.RANDOM);
+    testAddOrderMountTable(DestinationOrder.HASH_ALL);
+  }
+
+  private void testAddOrderMountTable(final DestinationOrder order)
+      throws IOException {
+    final String mnt = "/" + order;
+    MountTable newEntry = MountTable.newInstance(
+        mnt, Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+    newEntry.setDestOrder(order);
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Add
+    AddMountTableEntryRequest addRequest;
+    AddMountTableEntryResponse addResponse;
+    addRequest = AddMountTableEntryRequest.newInstance(newEntry);
+    addResponse = mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // Check that we have the read only entry
+    MountTable record = getMountTableEntry(mnt);
+    assertEquals(mnt, record.getSourcePath());
+    assertEquals(order, record.getDestOrder());
+
+    // Removing the new entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance(mnt);
+    RemoveMountTableEntryResponse removeResponse =
+        mountTable.removeMountTableEntry(removeRequest);
+    assertTrue(removeResponse.getStatus());
+  }
+
+  @Test
+  public void testRemoveMountTable() throws IOException {
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> entries1 = getMountTableEntries(mountTable);
+    assertEquals(entries1.size(), mockMountTable.size());
+
+    // Remove an entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance("/");
+    mountTable.removeMountTableEntry(removeRequest);
+
+    // New mount table size
+    List<MountTable> entries2 = getMountTableEntries(mountTable);
+    assertEquals(entries2.size(), mockMountTable.size() - 1);
+  }
+
+  @Test
+  public void testEditMountTable() throws IOException {
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Verify starting condition
+    MountTable entry = getMountTableEntry("/");
+    assertEquals(
+        Collections.singletonList(new RemoteLocation("ns0", "/")),
+        entry.getDestinations());
+
+    // Edit the entry for /
+    MountTable updatedEntry = MountTable.newInstance(
+        "/", Collections.singletonMap("ns1", "/"), Time.now(), Time.now());
+    UpdateMountTableEntryRequest updateRequest =
+        UpdateMountTableEntryRequest.newInstance(updatedEntry);
+    mountTable.updateMountTableEntry(updateRequest);
+
+    // Verify edited condition
+    entry = getMountTableEntry("/");
+    assertEquals(
+        Collections.singletonList(new RemoteLocation("ns1", "/")),
+        entry.getDestinations());
+  }
+
+  @Test
+  public void testGetMountTable() throws IOException {
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Verify size of table
+    List<MountTable> entries = getMountTableEntries(mountTable);
+    assertEquals(mockMountTable.size(), entries.size());
+
+    // Verify all entries are present
+    int matches = 0;
+    for (MountTable e : entries) {
+      for (MountTable entry : mockMountTable) {
+        assertEquals(e.getDestinations().size(), 1);
+        assertNotNull(e.getDateCreated());
+        assertNotNull(e.getDateModified());
+        if (entry.getSourcePath().equals(e.getSourcePath())) {
+          matches++;
+        }
+      }
+    }
+    assertEquals(matches, mockMountTable.size());
+  }
+
+  @Test
+  public void testGetSingleMountTableEntry() throws IOException {
+    MountTable entry = getMountTableEntry("/ns0");
+    assertNotNull(entry);
+    assertEquals(entry.getSourcePath(), "/ns0");
+  }
+
+  /**
+   * Gets an existing mount table record in the state store.
+   *
+   * @param mount The mount point of the record to remove.
+   * @return The matching record if found, null if it is not found.
+   * @throws IOException If the state store could not be accessed.
+   */
+  private MountTable getMountTableEntry(final String mount) throws IOException 
{
+    // Refresh the cache
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance(mount);
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+    List<MountTable> results = getMountTableEntries(mountTable, request);
+    if (results.size() > 0) {
+      // First result is sorted to have the shortest mount string length
+      return results.get(0);
+    }
+    return null;
+  }
+
+  private List<MountTable> getMountTableEntries(MountTableManager mountTable)
+      throws IOException {
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance("/");
+    return getMountTableEntries(mountTable, request);
+  }
+
+  private List<MountTable> getMountTableEntries(MountTableManager mountTable,
+      GetMountTableEntriesRequest request) throws IOException {
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesResponse response =
+        mountTable.getMountTableEntries(request);
+    return response.getEntries();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to