http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
new file mode 100644
index 0000000..6111c6b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests Router admin commands.
+ */
+public class TestRouterAdminCLI {
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  private static StateStoreService stateStore;
+
+  private static RouterAdmin admin;
+  private static RouterClient client;
+
+  private static final String TEST_USER = "test-user";
+
+  private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+  private static final PrintStream OLD_OUT = System.out;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new StateStoreDFSCluster(false, 1);
+    // Build and start a router with State Store + admin + RPC
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    cluster.addRouterOverrides(conf);
+
+    // Start routers
+    cluster.startRouters();
+
+    routerContext = cluster.getRandomRouter();
+    Router router = routerContext.getRouter();
+    stateStore = router.getStateStore();
+
+    Configuration routerConf = new Configuration();
+    InetSocketAddress routerSocket = router.getAdminServerAddress();
+    routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        routerSocket);
+    admin = new RouterAdmin(routerConf);
+    client = routerContext.getAdminClient();
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    cluster.stopRouter(routerContext);
+    cluster.shutdown();
+    cluster = null;
+  }
+
+  @After
+  public void tearDown() {
+    // set back system out
+    System.setOut(OLD_OUT);
+  }
+
+  @Test
+  public void testAddMountTable() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-addmounttable";
+    String dest = "/addmounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(src);
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    MountTable mountTable = getResponse.getEntries().get(0);
+
+    List<RemoteLocation> destinations = mountTable.getDestinations();
+    assertEquals(1, destinations.size());
+
+    assertEquals(src, mountTable.getSourcePath());
+    assertEquals(nsId, destinations.get(0).getNameserviceId());
+    assertEquals(dest, destinations.get(0).getDest());
+    assertFalse(mountTable.isReadOnly());
+
+    // test mount table update behavior
+    dest = dest + "-new";
+    argv = new String[] {"-add", src, nsId, dest, "-readonly"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    assertEquals(2, mountTable.getDestinations().size());
+    assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId());
+    assertEquals(dest, mountTable.getDestinations().get(1).getDest());
+    assertTrue(mountTable.isReadOnly());
+  }
+
+  @Test
+  public void testAddOrderMountTable() throws Exception {
+    testAddOrderMountTable(DestinationOrder.HASH);
+    testAddOrderMountTable(DestinationOrder.LOCAL);
+    testAddOrderMountTable(DestinationOrder.RANDOM);
+    testAddOrderMountTable(DestinationOrder.HASH_ALL);
+  }
+
+  private void testAddOrderMountTable(DestinationOrder order)
+      throws Exception {
+    final String mnt = "/" + order;
+    final String nsId = "ns0,ns1";
+    final String dest = "/";
+    String[] argv = new String[] {
+        "-add", mnt, nsId, dest, "-order", order.toString()};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    // Check the state in the State Store
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    MountTableManager mountTable = client.getMountTableManager();
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance(mnt);
+    GetMountTableEntriesResponse response =
+        mountTable.getMountTableEntries(request);
+    List<MountTable> entries = response.getEntries();
+    assertEquals(1, entries.size());
+    assertEquals(2, entries.get(0).getDestinations().size());
+    assertEquals(order, response.getEntries().get(0).getDestOrder());
+  }
+
+  @Test
+  public void testListMountTable() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-lsmounttable";
+    String dest = "/lsmounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    // re-set system out for testing
+    System.setOut(new PrintStream(out));
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    argv = new String[] {"-ls", src};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains(src));
+
+    out.reset();
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance("/");
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+
+    // Test ls command without input path, it will list
+    // mount table under root path.
+    argv = new String[] {"-ls"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains(src));
+    String outStr = out.toString();
+    // verify if all the mount table are listed
+    for(MountTable entry: getResponse.getEntries()) {
+      assertTrue(outStr.contains(entry.getSourcePath()));
+    }
+  }
+
+  @Test
+  public void testRemoveMountTable() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-rmmounttable";
+    String dest = "/rmmounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(src);
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    // ensure mount table added successfully
+    MountTable mountTable = getResponse.getEntries().get(0);
+    assertEquals(src, mountTable.getSourcePath());
+
+    argv = new String[] {"-rm", src};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    assertEquals(0, getResponse.getEntries().size());
+
+    // remove an invalid mount table
+    String invalidPath = "/invalid";
+    System.setOut(new PrintStream(out));
+    argv = new String[] {"-rm", invalidPath};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains(
+        "Cannot remove mount point " + invalidPath));
+  }
+
+  @Test
+  public void testMountTableDefaultACL() throws Exception {
+    String[] argv = new String[] {"-add", "/testpath0", "ns0", "/testdir0"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance("/testpath0");
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    MountTable mountTable = getResponse.getEntries().get(0);
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String group = ugi.getGroups().isEmpty() ? ugi.getShortUserName()
+        : ugi.getPrimaryGroupName();
+    assertEquals(ugi.getShortUserName(), mountTable.getOwnerName());
+    assertEquals(group, mountTable.getGroupName());
+    assertEquals((short) 0755, mountTable.getMode().toShort());
+  }
+
+  @Test
+  public void testMountTablePermissions() throws Exception {
+    // re-set system out for testing
+    System.setOut(new PrintStream(out));
+    // use superuser to add new mount table with only read permission
+    String[] argv = new String[] {"-add", "/testpath2-1", "ns0", "/testdir2-1",
+        "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0455"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    String superUser = UserGroupInformation.
+        getCurrentUser().getShortUserName();
+    // use normal user as current user to test
+    UserGroupInformation remoteUser = UserGroupInformation
+        .createRemoteUser(TEST_USER);
+    UserGroupInformation.setLoginUser(remoteUser);
+
+    // verify read permission by executing other commands
+    verifyExecutionResult("/testpath2-1", true, -1, -1);
+
+    // add new mount table with only write permission
+    argv = new String[] {"-add", "/testpath2-2", "ns0", "/testdir2-2",
+        "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0255"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    verifyExecutionResult("/testpath2-2", false, 0, 0);
+
+    // set mount table entry with read and write permission
+    argv = new String[] {"-add", "/testpath2-3", "ns0", "/testdir2-3",
+        "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0755"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    verifyExecutionResult("/testpath2-3", true, 0, 0);
+
+    // set back login user
+    remoteUser = UserGroupInformation.createRemoteUser(superUser);
+    UserGroupInformation.setLoginUser(remoteUser);
+  }
+
+  /**
+   * Verify router admin commands execution result.
+   *
+   * @param mount
+   *          target mount table
+   * @param canRead
+   *          whether can list mount tables under specified mount
+   * @param addCommandCode
+   *          expected return code of add command executed for specified mount
+   * @param rmCommandCode
+   *          expected return code of rm command executed for specified mount
+   * @throws Exception
+   */
+  private void verifyExecutionResult(String mount, boolean canRead,
+      int addCommandCode, int rmCommandCode) throws Exception {
+    String[] argv = null;
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+
+    out.reset();
+    // execute ls command
+    argv = new String[] {"-ls", mount};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertEquals(canRead, out.toString().contains(mount));
+
+    // execute add/update command
+    argv = new String[] {"-add", mount, "ns0", mount + "newdir"};
+    assertEquals(addCommandCode, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    // execute remove command
+    argv = new String[] {"-rm", mount};
+    assertEquals(rmCommandCode, ToolRunner.run(admin, argv));
+  }
+
+  @Test
+  public void testSetAndClearQuota() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-QuotaMounttable";
+    String dest = "/QuotaMounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(src);
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    MountTable mountTable = getResponse.getEntries().get(0);
+    RouterQuotaUsage quotaUsage = mountTable.getQuota();
+
+    // verify the default quota set
+    assertEquals(RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT,
+        quotaUsage.getFileAndDirectoryCount());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
+    assertEquals(RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT,
+        quotaUsage.getSpaceConsumed());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
+
+    long nsQuota = 50;
+    long ssQuota = 100;
+    argv = new String[] {"-setQuota", src, "-nsQuota", String.valueOf(nsQuota),
+        "-ssQuota", String.valueOf(ssQuota)};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    quotaUsage = mountTable.getQuota();
+
+    // verify if the quota is set
+    assertEquals(nsQuota, quotaUsage.getQuota());
+    assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+
+    // use quota string for setting ss quota
+    String newSsQuota = "2m";
+    argv = new String[] {"-setQuota", src, "-ssQuota", newSsQuota};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    quotaUsage = mountTable.getQuota();
+    // verify if ss quota is correctly set
+    assertEquals(2 * 1024 * 1024, quotaUsage.getSpaceQuota());
+
+    // test clrQuota command
+    argv = new String[] {"-clrQuota", src};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    quotaUsage = mountTable.getQuota();
+
+    // verify if quota unset successfully
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
+  }
+
+  @Test
+  public void testManageSafeMode() throws Exception {
+    // ensure the Router become RUNNING state
+    waitState(RouterServiceState.RUNNING);
+    assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "enter"}));
+    // verify state
+    assertEquals(RouterServiceState.SAFEMODE,
+        routerContext.getRouter().getRouterState());
+    assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode());
+
+    System.setOut(new PrintStream(out));
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "get"}));
+    assertTrue(out.toString().contains("true"));
+
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "leave"}));
+    // verify state
+    assertEquals(RouterServiceState.RUNNING,
+        routerContext.getRouter().getRouterState());
+    assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
+
+    out.reset();
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "get"}));
+    assertTrue(out.toString().contains("false"));
+  }
+
+  @Test
+  public void testCreateInvalidEntry() throws Exception {
+    String[] argv = new String[] {
+        "-add", "test-createInvalidEntry", "ns0", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", "ns0", "createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", null, "ns0", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", "ns0", null};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "", "ns0", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", null, "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", "", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+  }
+
+  /**
+   * Wait for the Router transforming to expected state.
+   * @param expectedState Expected Router state.
+   * @throws Exception
+   */
+  private void waitState(final RouterServiceState expectedState)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return expectedState == routerContext.getRouter().getRouterState();
+      }
+    }, 1000, 30000);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
new file mode 100644
index 0000000..80f2327
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import 
org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for router heartbeat service.
+ */
+public class TestRouterHeartbeatService {
+  private Router router;
+  private final String routerId = "router1";
+  private TestingServer testingServer;
+  private CuratorFramework curatorFramework;
+
+  @Before
+  public void setup() throws Exception {
+    router = new Router();
+    router.setRouterId(routerId);
+    Configuration conf = new Configuration();
+    conf.setInt(RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1);
+    Configuration routerConfig =
+        new RouterConfigBuilder(conf).stateStore().build();
+    routerConfig.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+        TimeUnit.HOURS.toMillis(1));
+    routerConfig.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+        StateStoreZooKeeperImpl.class, StateStoreDriver.class);
+
+    testingServer = new TestingServer();
+    String connectStr = testingServer.getConnectString();
+    curatorFramework = CuratorFrameworkFactory.builder()
+        .connectString(connectStr)
+        .retryPolicy(new RetryNTimes(100, 100))
+        .build();
+    curatorFramework.start();
+    routerConfig.set(CommonConfigurationKeys.ZK_ADDRESS, connectStr);
+    router.init(routerConfig);
+    router.start();
+
+
+    waitStateStore(router.getStateStore(), TimeUnit.SECONDS.toMicros(10));
+  }
+
+  @Test
+  public void testStateStoreUnavailable() throws IOException {
+    curatorFramework.close();
+    testingServer.stop();
+    router.getStateStore().stop();
+    // The driver is not ready
+    assertFalse(router.getStateStore().isDriverReady());
+
+    // Do a heartbeat, and no exception thrown out
+    RouterHeartbeatService heartbeatService =
+        new RouterHeartbeatService(router);
+    heartbeatService.updateStateStore();
+  }
+
+  @Test
+  public void testStateStoreAvailable() throws Exception {
+    // The driver is ready
+    StateStoreService stateStore = router.getStateStore();
+    assertTrue(router.getStateStore().isDriverReady());
+    RouterStore routerStore = router.getRouterStateManager();
+
+    // No record about this router
+    stateStore.refreshCaches(true);
+    GetRouterRegistrationRequest request =
+        GetRouterRegistrationRequest.newInstance(routerId);
+    GetRouterRegistrationResponse response =
+        router.getRouterStateManager().getRouterRegistration(request);
+    RouterState routerState = response.getRouter();
+    String id = routerState.getRouterId();
+    StateStoreVersion version = routerState.getStateStoreVersion();
+    assertNull(id);
+    assertNull(version);
+
+    // Do a heartbeat
+    RouterHeartbeatService heartbeatService =
+        new RouterHeartbeatService(router);
+    heartbeatService.updateStateStore();
+
+    // We should have a record
+    stateStore.refreshCaches(true);
+    request = GetRouterRegistrationRequest.newInstance(routerId);
+    response = routerStore.getRouterRegistration(request);
+    routerState = response.getRouter();
+    id = routerState.getRouterId();
+    version = routerState.getStateStoreVersion();
+    assertNotNull(id);
+    assertNotNull(version);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (curatorFramework != null) {
+      curatorFramework.close();
+    }
+    if (testingServer != null) {
+      testingServer.stop();
+    }
+    if (router != null) {
+      router.shutDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java
new file mode 100644
index 0000000..8702b3c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test a router end-to-end including the MountTable.
+ */
+public class TestRouterMountTable {
+
+  private static StateStoreDFSCluster cluster;
+  private static NamenodeContext nnContext;
+  private static RouterContext routerContext;
+  private static MountTableResolver mountTable;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 1);
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    cluster.addRouterOverrides(conf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    // Get the end points
+    nnContext = cluster.getRandomNamenode();
+    routerContext = cluster.getRandomRouter();
+    Router router = routerContext.getRouter();
+    mountTable = (MountTableResolver) router.getSubclusterResolver();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testReadOnly() throws Exception {
+
+    // Add a read only entry
+    MountTable readOnlyEntry = MountTable.newInstance(
+        "/readonly", Collections.singletonMap("ns0", "/testdir"));
+    readOnlyEntry.setReadOnly(true);
+    assertTrue(addMountTable(readOnlyEntry));
+
+    // Add a regular entry
+    MountTable regularEntry = MountTable.newInstance(
+        "/regular", Collections.singletonMap("ns0", "/testdir"));
+    assertTrue(addMountTable(regularEntry));
+
+    // Create a folder which should show in all locations
+    final FileSystem nnFs = nnContext.getFileSystem();
+    final FileSystem routerFs = routerContext.getFileSystem();
+    assertTrue(routerFs.mkdirs(new Path("/regular/newdir")));
+
+    FileStatus dirStatusNn =
+        nnFs.getFileStatus(new Path("/testdir/newdir"));
+    assertTrue(dirStatusNn.isDirectory());
+    FileStatus dirStatusRegular =
+        routerFs.getFileStatus(new Path("/regular/newdir"));
+    assertTrue(dirStatusRegular.isDirectory());
+    FileStatus dirStatusReadOnly =
+        routerFs.getFileStatus(new Path("/readonly/newdir"));
+    assertTrue(dirStatusReadOnly.isDirectory());
+
+    // It should fail writing into a read only path
+    try {
+      routerFs.mkdirs(new Path("/readonly/newdirfail"));
+      fail("We should not be able to write into a read only mount point");
+    } catch (IOException ioe) {
+      String msg = ioe.getMessage();
+      assertTrue(msg.startsWith(
+          "/readonly/newdirfail is in a read only mount point"));
+    }
+  }
+
+  /**
+   * Add a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to add.
+   * @return If it was succesfully added.
+   * @throws IOException Problems adding entries.
+   */
+  private boolean addMountTable(final MountTable entry) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(entry);
+    AddMountTableEntryResponse addResponse =
+        mountTableManager.addMountTableEntry(addRequest);
+
+    // Reload the Router cache
+    mountTable.loadCache(true);
+
+    return addResponse.getStatus();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
new file mode 100644
index 0000000..eabc0fe
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test namenodes monitor behavior in the Router.
+ */
+public class TestRouterNamenodeMonitoring {
+
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  private static MembershipNamenodeResolver resolver;
+
+  private String ns0;
+  private String ns1;
+  private long initializedTime;
+
+  @Before
+  public void setUp() throws Exception {
+    // Build and start a federated cluster with HA enabled
+    cluster = new StateStoreDFSCluster(true, 2);
+    // Enable heartbeat service and local heartbeat
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .enableLocalHeartbeat(true)
+        .heartbeat()
+        .build();
+
+    // Specify local node (ns0.nn1) to monitor
+    StringBuilder sb = new StringBuilder();
+    ns0 = cluster.getNameservices().get(0);
+    NamenodeContext context = cluster.getNamenodes(ns0).get(1);
+    routerConf.set(DFS_NAMESERVICE_ID, ns0);
+    routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
+
+    // Specify namenodes (ns1.nn0,ns1.nn1) to monitor
+    sb = new StringBuilder();
+    ns1 = cluster.getNameservices().get(1);
+    for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
+      String suffix = ctx.getConfSuffix();
+      if (sb.length() != 0) {
+        sb.append(",");
+      }
+      sb.append(suffix);
+    }
+    // override with the namenodes: ns1.nn0,ns1.nn1
+    routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    routerContext = cluster.getRandomRouter();
+    resolver = (MembershipNamenodeResolver) routerContext.getRouter()
+        .getNamenodeResolver();
+    initializedTime = Time.now();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testNamenodeMonitoring() throws Exception {
+    // Set nn0 to active for all nameservices
+    for (String ns : cluster.getNameservices()) {
+      cluster.switchToActive(ns, "nn0");
+      cluster.switchToStandby(ns, "nn1");
+    }
+
+    Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
+        .getRouter().getNamenodeHearbeatServices();
+    // manually trigger the heartbeat
+    for (NamenodeHeartbeatService service : heartbeatServices) {
+      service.periodicInvoke();
+    }
+
+    resolver.loadCache(true);
+    List<? extends FederationNamenodeContext> namespaceInfo0 =
+        resolver.getNamenodesForNameserviceId(ns0);
+    List<? extends FederationNamenodeContext> namespaceInfo1 =
+        resolver.getNamenodesForNameserviceId(ns1);
+
+    // The modified date won't be updated in ns0.nn0 since it isn't
+    // monitored by the Router.
+    assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
+    assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
+
+    // other namnodes should be updated as expected
+    assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
+    assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
+
+    assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
+    assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
+
+    assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
+    assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
new file mode 100644
index 0000000..66a955e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests quota behaviors in Router-based Federation.
+ */
+public class TestRouterQuota {
+  private static StateStoreDFSCluster cluster;
+  private static NamenodeContext nnContext1;
+  private static NamenodeContext nnContext2;
+  private static RouterContext routerContext;
+  private static MountTableResolver resolver;
+
+  private static final int BLOCK_SIZE = 512;
+
+  @Before
+  public void setUp() throws Exception {
+
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 2);
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .quota()
+        .rpc()
+        .build();
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s");
+
+    // override some hdfs settings that used in testing space quota
+    Configuration hdfsConf = new Configuration(false);
+    hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1);
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.addNamenodeOverrides(hdfsConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
+    nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null);
+    routerContext = cluster.getRandomRouter();
+    Router router = routerContext.getRouter();
+    resolver = (MountTableResolver) router.getSubclusterResolver();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testNamespaceQuotaExceed() throws Exception {
+    long nsQuota = 3;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /nsquota --> ns0---testdir1
+    // /nsquota/subdir --> ns1---testdir2
+    nnFs1.mkdirs(new Path("/testdir1"));
+    nnFs2.mkdirs(new Path("/testdir2"));
+    MountTable mountTable1 = MountTable.newInstance("/nsquota",
+        Collections.singletonMap("ns0", "/testdir1"));
+
+    mountTable1.setQuota(new 
RouterQuotaUsage.Builder().quota(nsQuota).build());
+    addMountTable(mountTable1);
+
+    MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir",
+        Collections.singletonMap("ns1", "/testdir2"));
+    mountTable2.setQuota(new 
RouterQuotaUsage.Builder().quota(nsQuota).build());
+    addMountTable(mountTable2);
+
+    final FileSystem routerFs = routerContext.getFileSystem();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+      @Override
+      public Boolean get() {
+        boolean isNsQuotaViolated = false;
+        try {
+          // create new directory to trigger NSQuotaExceededException
+          routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID()));
+          routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID()));
+        } catch (NSQuotaExceededException e) {
+          isNsQuotaViolated = true;
+        } catch (IOException ignored) {
+        }
+        return isNsQuotaViolated;
+      }
+    }, 5000, 60000);
+    // mkdir in real FileSystem should be okay
+    nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID()));
+    nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID()));
+  }
+
+  @Test
+  public void testStorageSpaceQuotaaExceed() throws Exception {
+    long ssQuota = 3071;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /ssquota --> ns0---testdir3
+    // /ssquota/subdir --> ns1---testdir4
+    nnFs1.mkdirs(new Path("/testdir3"));
+    nnFs2.mkdirs(new Path("/testdir4"));
+    MountTable mountTable1 = MountTable.newInstance("/ssquota",
+        Collections.singletonMap("ns0", "/testdir3"));
+
+    mountTable1
+        .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
+    addMountTable(mountTable1);
+
+    MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir",
+        Collections.singletonMap("ns1", "/testdir4"));
+    mountTable2
+        .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
+    addMountTable(mountTable2);
+
+    DFSClient routerClient = routerContext.getClient();
+    routerClient.create("/ssquota/file", true).close();
+    routerClient.create("/ssquota/subdir/file", true).close();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+      @Override
+      public Boolean get() {
+        boolean isDsQuotaViolated = false;
+        try {
+          // append data to trigger NSQuotaExceededException
+          appendData("/ssquota/file", routerClient, BLOCK_SIZE);
+          appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
+        } catch (DSQuotaExceededException e) {
+          isDsQuotaViolated = true;
+        } catch (IOException ignored) {
+        }
+        return isDsQuotaViolated;
+      }
+    }, 5000, 60000);
+
+    // append data to destination path in real FileSystem should be okay
+    appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE);
+    appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE);
+  }
+
+  /**
+   * Add a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to add.
+   * @return If it was successfully added.
+   * @throws IOException Problems adding entries.
+   */
+  private boolean addMountTable(final MountTable entry) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(entry);
+    AddMountTableEntryResponse addResponse =
+        mountTableManager.addMountTableEntry(addRequest);
+
+    // Reload the Router cache
+    resolver.loadCache(true);
+
+    return addResponse.getStatus();
+  }
+
+  /**
+   * Append data in specified file.
+   * @param path Path of file.
+   * @param client DFS Client.
+   * @param dataLen The length of write data.
+   * @throws IOException
+   */
+  private void appendData(String path, DFSClient client, int dataLen)
+      throws IOException {
+    EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
+    HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null,
+        null);
+    byte[] data = new byte[dataLen];
+    stream.write(data);
+    stream.close();
+  }
+
+  @Test
+  public void testSetQuota() throws Exception {
+    long nsQuota = 5;
+    long ssQuota = 100;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /setquota --> ns0---testdir5
+    // /setquota/subdir --> ns1---testdir6
+    nnFs1.mkdirs(new Path("/testdir5"));
+    nnFs2.mkdirs(new Path("/testdir6"));
+    MountTable mountTable1 = MountTable.newInstance("/setquota",
+        Collections.singletonMap("ns0", "/testdir5"));
+    mountTable1
+        .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable1);
+
+    // don't set quota for subpath of mount table
+    MountTable mountTable2 = MountTable.newInstance("/setquota/subdir",
+        Collections.singletonMap("ns1", "/testdir6"));
+    addMountTable(mountTable2);
+
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    // ensure setQuota RPC call was invoked
+    updateService.periodicInvoke();
+
+    ClientProtocol client1 = nnContext1.getClient().getNamenode();
+    ClientProtocol client2 = nnContext2.getClient().getNamenode();
+    final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5");
+    final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6");
+
+    assertEquals(nsQuota, quota1.getQuota());
+    assertEquals(ssQuota, quota1.getSpaceQuota());
+    assertEquals(nsQuota, quota2.getQuota());
+    assertEquals(ssQuota, quota2.getSpaceQuota());
+  }
+
+  @Test
+  public void testGetQuota() throws Exception {
+    long nsQuota = 10;
+    long ssQuota = 100;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /getquota --> ns0---/testdir7
+    // /getquota/subdir1 --> ns0---/testdir7/subdir
+    // /getquota/subdir2 --> ns1---/testdir8
+    nnFs1.mkdirs(new Path("/testdir7"));
+    nnFs1.mkdirs(new Path("/testdir7/subdir"));
+    nnFs2.mkdirs(new Path("/testdir8"));
+    MountTable mountTable1 = MountTable.newInstance("/getquota",
+        Collections.singletonMap("ns0", "/testdir7"));
+    mountTable1
+        .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable1);
+
+    MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1",
+        Collections.singletonMap("ns0", "/testdir7/subdir"));
+    addMountTable(mountTable2);
+
+    MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2",
+        Collections.singletonMap("ns1", "/testdir8"));
+    addMountTable(mountTable3);
+
+    // use router client to create new files
+    DFSClient routerClient = routerContext.getClient();
+    routerClient.create("/getquota/file", true).close();
+    routerClient.create("/getquota/subdir1/file", true).close();
+    routerClient.create("/getquota/subdir2/file", true).close();
+
+    ClientProtocol clientProtocol = routerContext.getClient().getNamenode();
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+    final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota");
+    // the quota should be aggregated
+    assertEquals(6, quota.getFileAndDirectoryCount());
+  }
+
+  @Test
+  public void testStaleQuotaRemoving() throws Exception {
+    long nsQuota = 20;
+    long ssQuota = 200;
+    String stalePath = "/stalequota";
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+
+    // Add one mount tables:
+    // /stalequota --> ns0---/testdir9
+    nnFs1.mkdirs(new Path("/testdir9"));
+    MountTable mountTable = MountTable.newInstance(stalePath,
+        Collections.singletonMap("ns0", "/testdir9"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+
+    // Call periodicInvoke to ensure quota for stalePath was
+    // loaded into quota manager.
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+
+    // use quota manager to get its quota usage and do verification
+    RouterQuotaManager quotaManager = routerContext.getRouter()
+        .getQuotaManager();
+    RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath);
+    assertEquals(nsQuota, quota.getQuota());
+    assertEquals(ssQuota, quota.getSpaceQuota());
+
+    // remove stale path entry
+    removeMountTable(stalePath);
+    updateService.periodicInvoke();
+    // the stale entry should be removed and we will get null
+    quota = quotaManager.getQuotaUsage(stalePath);
+    assertNull(quota);
+  }
+
+  /**
+   * Remove a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to remove.
+   * @return If it was successfully removed.
+   * @throws IOException Problems removing entries.
+   */
+  private boolean removeMountTable(String path) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest
+        .newInstance(path);
+    RemoveMountTableEntryResponse removeResponse = mountTableManager
+        .removeMountTableEntry(removeRequest);
+
+    // Reload the Router cache
+    resolver.loadCache(true);
+    return removeResponse.getStatus();
+  }
+
+  @Test
+  public void testQuotaUpdating() throws Exception {
+    long nsQuota = 30;
+    long ssQuota = 1024;
+    String path = "/updatequota";
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+
+    // Add one mount table:
+    // /updatequota --> ns0---/testdir10
+    nnFs1.mkdirs(new Path("/testdir10"));
+    MountTable mountTable = MountTable.newInstance(path,
+        Collections.singletonMap("ns0", "/testdir10"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+
+    // Call periodicInvoke to ensure quota  updated in quota manager
+    // and state store.
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+
+    // verify initial quota value
+    List<MountTable> results = getMountTable(path);
+    MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null;
+    RouterQuotaUsage quota = updatedMountTable.getQuota();
+    assertEquals(nsQuota, quota.getQuota());
+    assertEquals(ssQuota, quota.getSpaceQuota());
+    assertEquals(1, quota.getFileAndDirectoryCount());
+    assertEquals(0, quota.getSpaceConsumed());
+
+    // mkdir and write a new file
+    final FileSystem routerFs = routerContext.getFileSystem();
+    routerFs.mkdirs(new Path(path + "/" + UUID.randomUUID()));
+    DFSClient routerClient = routerContext.getClient();
+    routerClient.create(path + "/file", true).close();
+    appendData(path + "/file", routerClient, BLOCK_SIZE);
+
+    updateService.periodicInvoke();
+    results = getMountTable(path);
+    updatedMountTable = !results.isEmpty() ? results.get(0) : null;
+    quota = updatedMountTable.getQuota();
+
+    // verify if quota has been updated in state store
+    assertEquals(nsQuota, quota.getQuota());
+    assertEquals(ssQuota, quota.getSpaceQuota());
+    assertEquals(3, quota.getFileAndDirectoryCount());
+    assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
+  }
+
+  /**
+   * Get the mount table entries of specified path through the admin API.
+   * @param path Mount table entry to get.
+   * @return If it was successfully got.
+   * @throws IOException Problems getting entries.
+   */
+  private List<MountTable> getMountTable(String path) throws IOException {
+    // Reload the Router cache
+    resolver.loadCache(true);
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(path);
+    GetMountTableEntriesResponse removeResponse = mountTableManager
+        .getMountTableEntries(getRequest);
+
+    return removeResponse.getEntries();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
new file mode 100644
index 0000000..ce3ee17
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for class {@link RouterQuotaManager}.
+ */
+public class TestRouterQuotaManager {
+  private static RouterQuotaManager manager;
+
+  @Before
+  public void setup() {
+    manager = new RouterQuotaManager();
+  }
+
+  @After
+  public void cleanup() {
+    manager.clear();
+  }
+
+  @Test
+  public void testGetChildrenPaths() {
+    RouterQuotaUsage quotaUsage = new RouterQuotaUsage.Builder().build();
+    manager.put("/path1", quotaUsage);
+    manager.put("/path2", quotaUsage);
+    manager.put("/path1/subdir", quotaUsage);
+    manager.put("/path1/subdir/subdir", quotaUsage);
+
+    Set<String> childrenPaths = manager.getPaths("/path1");
+    assertEquals(3, childrenPaths.size());
+    assertTrue(childrenPaths.contains("/path1/subdir")
+        && childrenPaths.contains("/path1/subdir/subdir")
+        && childrenPaths.contains("/path1"));
+
+    // test for corner case
+    manager.put("/path3", quotaUsage);
+    manager.put("/path3/subdir", quotaUsage);
+    manager.put("/path3-subdir", quotaUsage);
+
+    childrenPaths = manager.getPaths("/path3");
+    assertEquals(2, childrenPaths.size());
+    // path /path3-subdir should not be returned
+    assertTrue(childrenPaths.contains("/path3")
+        && childrenPaths.contains("/path3/subdir")
+        && !childrenPaths.contains("/path3-subdir"));
+  }
+
+  @Test
+  public void testGetQuotaUsage() {
+    RouterQuotaUsage quotaGet;
+
+    // test case1: get quota with an non-exist path
+    quotaGet = manager.getQuotaUsage("/non-exist-path");
+    assertNull(quotaGet);
+
+    // test case2: get quota from an no-quota set path
+    RouterQuotaUsage.Builder quota = new RouterQuotaUsage.Builder()
+        .quota(HdfsConstants.QUOTA_DONT_SET)
+        .spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/noQuotaSet", quota.build());
+    quotaGet = manager.getQuotaUsage("/noQuotaSet");
+    // it should return null
+    assertNull(quotaGet);
+
+    // test case3: get quota from an quota-set path
+    quota.quota(1);
+    quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/hasQuotaSet", quota.build());
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet");
+    assertEquals(1, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+    // test case4: get quota with an non-exist child path
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet/file");
+    // it will return the nearest ancestor which quota was set
+    assertEquals(1, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+    // test case5: get quota with an child path which its parent
+    // wasn't quota set
+    quota.quota(HdfsConstants.QUOTA_DONT_SET);
+    quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/hasQuotaSet/noQuotaSet", quota.build());
+    // here should returns the quota of path /hasQuotaSet
+    // (the nearest ancestor which quota was set)
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet/noQuotaSet/file");
+    assertEquals(1, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+    // test case6: get quota with an child path which its parent was quota set
+    quota.quota(2);
+    quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/hasQuotaSet/hasQuotaSet", quota.build());
+    // here should return the quota of path /hasQuotaSet/hasQuotaSet
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet/hasQuotaSet/file");
+    assertEquals(2, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
new file mode 100644
index 0000000..61e7657
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test retry behavior of the Router RPC Client.
+ */
+public class TestRouterRPCClientRetries {
+
+  private static StateStoreDFSCluster cluster;
+  private static NamenodeContext nnContext1;
+  private static RouterContext routerContext;
+  private static MembershipNamenodeResolver resolver;
+  private static ClientProtocol routerProtocol;
+
+  @Before
+  public void setUp() throws Exception {
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 2);
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+
+    // reduce IPC client connection retry times and interval time
+    Configuration clientConf = new Configuration(false);
+    clientConf.setInt(
+        CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    clientConf.setInt(
+        CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100);
+
+    cluster.addRouterOverrides(routerConf);
+    // override some settings for the client
+    cluster.startCluster(clientConf);
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
+    routerContext = cluster.getRandomRouter();
+    resolver = (MembershipNamenodeResolver) routerContext.getRouter()
+        .getNamenodeResolver();
+    routerProtocol = routerContext.getClient().getNamenode();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testRetryWhenAllNameServiceDown() throws Exception {
+    // shutdown the dfs cluster
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    dfsCluster.shutdown();
+
+    // register an invalid namenode report
+    registerInvalidNameReport();
+
+    // Create a directory via the router
+    String dirPath = "/testRetryWhenClusterisDown";
+    FsPermission permission = new FsPermission("705");
+    try {
+      routerProtocol.mkdirs(dirPath, permission, false);
+      fail("Should have thrown RemoteException error.");
+    } catch (RemoteException e) {
+      String ns0 = cluster.getNameservices().get(0);
+      GenericTestUtils.assertExceptionContains(
+          "No namenode available under nameservice " + ns0, e);
+    }
+
+    // Verify the retry times, it should only retry one time.
+    FederationRPCMetrics rpcMetrics = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics();
+    assertEquals(1, rpcMetrics.getProxyOpRetries());
+  }
+
+  @Test
+  public void testRetryWhenOneNameServiceDown() throws Exception {
+    // shutdown the dfs cluster
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    dfsCluster.shutdownNameNode(0);
+
+    // register an invalid namenode report
+    registerInvalidNameReport();
+
+    DFSClient client = nnContext1.getClient();
+    // Renew lease for the DFS client, it will succeed.
+    routerProtocol.renewLease(client.getClientName());
+
+    // Verify the retry times, it will retry one time for ns0.
+    FederationRPCMetrics rpcMetrics = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics();
+    assertEquals(1, rpcMetrics.getProxyOpRetries());
+  }
+
+  /**
+   * Register an invalid namenode report.
+   * @throws IOException
+   */
+  private void registerInvalidNameReport() throws IOException {
+    String ns0 = cluster.getNameservices().get(0);
+    List<? extends FederationNamenodeContext> origin = resolver
+        .getNamenodesForNameserviceId(ns0);
+    FederationNamenodeContext nnInfo = origin.get(0);
+    NamenodeStatusReport report = new NamenodeStatusReport(ns0,
+        nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
+        nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
+        nnInfo.getWebAddress());
+    report.setRegistrationValid(false);
+    assertTrue(resolver.registerNamenode(report));
+    resolver.loadCache(true);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to