Repository: hadoop
Updated Branches:
  refs/heads/HDFS-13891 7d7e88c30 -> c9ebaf2d3


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9ebaf2d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
new file mode 100644
index 0000000..c90e614
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This test class verifies that mount table cache is updated on all the 
routers
+ * when MountTableRefreshService is enabled and there is a change in mount 
table
+ * entries.
+ */
+public class TestRouterMountTableCacheRefresh {
+  private static TestingServer curatorTestingServer;
+  private static MiniRouterDFSCluster cluster;
+  private static RouterContext routerContext;
+  private static MountTableManager mountTableManager;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    curatorTestingServer = new TestingServer();
+    curatorTestingServer.start();
+    final String connectString = curatorTestingServer.getConnectString();
+    int numNameservices = 2;
+    cluster = new MiniRouterDFSCluster(false, numNameservices);
+    Configuration conf = new RouterConfigBuilder().refreshCache().admin().rpc()
+        .heartbeat().build();
+    conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
+        FileSubclusterResolver.class);
+    conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
+    cluster.addRouterOverrides(conf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+    routerContext = cluster.getRandomRouter();
+    RouterStore routerStateManager =
+        routerContext.getRouter().getRouterStateManager();
+    mountTableManager = routerContext.getAdminClient().getMountTableManager();
+    // wait for one minute for all the routers to get registered
+    FederationTestUtils.waitRouterRegistered(routerStateManager,
+        numNameservices, 60000);
+  }
+
+  @AfterClass
+  public static void destory() {
+    try {
+      curatorTestingServer.close();
+      cluster.shutdown();
+    } catch (IOException e) {
+      // do nothing
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    clearEntries();
+  }
+
+  private void clearEntries() throws IOException {
+    List<MountTable> result = getMountTableEntries();
+    for (MountTable mountTable : result) {
+      RemoveMountTableEntryResponse removeMountTableEntry =
+          mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest
+              .newInstance(mountTable.getSourcePath()));
+      assertTrue(removeMountTableEntry.getStatus());
+    }
+  }
+
+  /**
+   * addMountTableEntry API should internally update the cache on all the
+   * routers.
+   */
+  @Test
+  public void testMountTableEntriesCacheUpdatedAfterAddAPICall()
+      throws IOException {
+
+    // Existing mount table size
+    int existingEntriesCount = getNumMountTableEntries();
+    String srcPath = "/addPath";
+    MountTable newEntry = MountTable.newInstance(srcPath,
+        Collections.singletonMap("ns0", "/addPathDest"), Time.now(),
+        Time.now());
+    addMountTableEntry(mountTableManager, newEntry);
+
+    // When Add entry is done, all the routers must have updated its mount 
table
+    // entry
+    List<RouterContext> routers = getRouters();
+    for (RouterContext rc : routers) {
+      List<MountTable> result =
+          getMountTableEntries(rc.getAdminClient().getMountTableManager());
+      assertEquals(1 + existingEntriesCount, result.size());
+      MountTable mountTableResult = result.get(0);
+      assertEquals(srcPath, mountTableResult.getSourcePath());
+    }
+  }
+
+  /**
+   * removeMountTableEntry API should internally update the cache on all the
+   * routers.
+   */
+  @Test
+  public void testMountTableEntriesCacheUpdatedAfterRemoveAPICall()
+      throws IOException {
+    // add
+    String srcPath = "/removePathSrc";
+    MountTable newEntry = MountTable.newInstance(srcPath,
+        Collections.singletonMap("ns0", "/removePathDest"), Time.now(),
+        Time.now());
+    addMountTableEntry(mountTableManager, newEntry);
+    int addCount = getNumMountTableEntries();
+    assertEquals(1, addCount);
+
+    // remove
+    RemoveMountTableEntryResponse removeMountTableEntry =
+        mountTableManager.removeMountTableEntry(
+            RemoveMountTableEntryRequest.newInstance(srcPath));
+    assertTrue(removeMountTableEntry.getStatus());
+
+    int removeCount = getNumMountTableEntries();
+    assertEquals(addCount - 1, removeCount);
+  }
+
+  /**
+   * updateMountTableEntry API should internally update the cache on all the
+   * routers.
+   */
+  @Test
+  public void testMountTableEntriesCacheUpdatedAfterUpdateAPICall()
+      throws IOException {
+    // add
+    String srcPath = "/updatePathSrc";
+    MountTable newEntry = MountTable.newInstance(srcPath,
+        Collections.singletonMap("ns0", "/updatePathDest"), Time.now(),
+        Time.now());
+    addMountTableEntry(mountTableManager, newEntry);
+    int addCount = getNumMountTableEntries();
+    assertEquals(1, addCount);
+
+    // update
+    String key = "ns1";
+    String value = "/updatePathDest2";
+    MountTable upateEntry = MountTable.newInstance(srcPath,
+        Collections.singletonMap(key, value), Time.now(), Time.now());
+    UpdateMountTableEntryResponse updateMountTableEntry =
+        mountTableManager.updateMountTableEntry(
+            UpdateMountTableEntryRequest.newInstance(upateEntry));
+    assertTrue(updateMountTableEntry.getStatus());
+    MountTable updatedMountTable = getMountTableEntry(srcPath);
+    assertNotNull("Updated mount table entrty cannot be null",
+        updatedMountTable);
+    assertEquals(1, updatedMountTable.getDestinations().size());
+    assertEquals(key,
+        updatedMountTable.getDestinations().get(0).getNameserviceId());
+    assertEquals(value, updatedMountTable.getDestinations().get(0).getDest());
+  }
+
+  /**
+   * After caching RouterClient if router goes down, refresh should be
+   * successful on other available router. The router which is not running
+   * should be ignored.
+   */
+  @Test
+  public void testCachedRouterClientBehaviourAfterRouterStoped()
+      throws IOException {
+    String srcPath = "/addPathClientCache";
+    MountTable newEntry = MountTable.newInstance(srcPath,
+        Collections.singletonMap("ns0", "/addPathClientCacheDest"), Time.now(),
+        Time.now());
+    addMountTableEntry(mountTableManager, newEntry);
+
+    // When Add entry is done, all the routers must have updated its mount 
table
+    // entry
+    List<RouterContext> routers = getRouters();
+    for (RouterContext rc : routers) {
+      List<MountTable> result =
+          getMountTableEntries(rc.getAdminClient().getMountTableManager());
+      assertEquals(1, result.size());
+      MountTable mountTableResult = result.get(0);
+      assertEquals(srcPath, mountTableResult.getSourcePath());
+    }
+
+    // Lets stop one router
+    for (RouterContext rc : routers) {
+      InetSocketAddress adminServerAddress =
+          rc.getRouter().getAdminServerAddress();
+      if (!routerContext.getRouter().getAdminServerAddress()
+          .equals(adminServerAddress)) {
+        cluster.stopRouter(rc);
+        break;
+      }
+    }
+
+    srcPath = "/addPathClientCache2";
+    newEntry = MountTable.newInstance(srcPath,
+        Collections.singletonMap("ns0", "/addPathClientCacheDest2"), 
Time.now(),
+        Time.now());
+    addMountTableEntry(mountTableManager, newEntry);
+    for (RouterContext rc : getRouters()) {
+      List<MountTable> result =
+          getMountTableEntries(rc.getAdminClient().getMountTableManager());
+      assertEquals(2, result.size());
+    }
+  }
+
+  private List<RouterContext> getRouters() {
+    List<RouterContext> result = new ArrayList<>();
+    for (RouterContext rc : cluster.getRouters()) {
+      if (rc.getRouter().getServiceState() == STATE.STARTED) {
+        result.add(rc);
+      }
+    }
+    return result;
+  }
+
+  @Test
+  public void testRefreshMountTableEntriesAPI() throws IOException {
+    RefreshMountTableEntriesRequest request =
+        RefreshMountTableEntriesRequest.newInstance();
+    RefreshMountTableEntriesResponse refreshMountTableEntriesRes =
+        mountTableManager.refreshMountTableEntries(request);
+    // refresh should be successful
+    assertTrue(refreshMountTableEntriesRes.getResult());
+  }
+
+  /**
+   * Verify cache update timeouts when any of the router takes more time than
+   * the configured timeout period.
+   */
+  @Test(timeout = 10000)
+  public void testMountTableEntriesCacheUpdateTimeout() throws IOException {
+    // Resources will be closed when router is closed
+    @SuppressWarnings("resource")
+    MountTableRefresherService mountTableRefresherService =
+        new MountTableRefresherService(routerContext.getRouter()) {
+          @Override
+          protected MountTableRefresherThread getLocalRefresher(
+              String adminAddress) {
+            return new MountTableRefresherThread(null, adminAddress) {
+              @Override
+              public void run() {
+                try {
+                  // Sleep 1 minute
+                  Thread.sleep(60000);
+                } catch (InterruptedException e) {
+                  // Do nothing
+                }
+              }
+            };
+          }
+        };
+    Configuration config = routerContext.getRouter().getConfig();
+    config.setTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, 5,
+        TimeUnit.SECONDS);
+    mountTableRefresherService.init(config);
+    // One router is not responding for 1 minute, still refresh should
+    // finished in 5 second as cache update timeout is set 5 second.
+    mountTableRefresherService.refresh();
+    // Test case timeout is assert for this test case.
+  }
+
+  /**
+   * Verify Cached RouterClient connections are removed from cache and closed
+   * when their max live time is elapsed.
+   */
+  @Test
+  public void testRouterClientConnectionExpiration() throws Exception {
+    final AtomicInteger createCounter = new AtomicInteger();
+    final AtomicInteger removeCounter = new AtomicInteger();
+    // Resources will be closed when router is closed
+    @SuppressWarnings("resource")
+    MountTableRefresherService mountTableRefresherService =
+        new MountTableRefresherService(routerContext.getRouter()) {
+          @Override
+          protected void closeRouterClient(RouterClient client) {
+            super.closeRouterClient(client);
+            removeCounter.incrementAndGet();
+          }
+
+          @Override
+          protected RouterClient createRouterClient(
+              InetSocketAddress routerSocket, Configuration config)
+              throws IOException {
+            createCounter.incrementAndGet();
+            return super.createRouterClient(routerSocket, config);
+          }
+        };
+    int clientCacheTime = 2000;
+    Configuration config = routerContext.getRouter().getConfig();
+    config.setTimeDuration(
+        RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, 
clientCacheTime,
+        TimeUnit.MILLISECONDS);
+    mountTableRefresherService.init(config);
+    // Do refresh to created RouterClient
+    mountTableRefresherService.refresh();
+    assertNotEquals("No RouterClient is created.", 0, createCounter.get());
+    /*
+     * Wait for clients to expire. Lets wait triple the cache eviction period.
+     * After cache eviction period all created client must be removed and
+     * closed.
+     */
+    GenericTestUtils.waitFor(() -> createCounter.get() == removeCounter.get(),
+        100, 3 * clientCacheTime);
+  }
+
+  private int getNumMountTableEntries() throws IOException {
+    List<MountTable> records = getMountTableEntries();
+    int oldEntriesCount = records.size();
+    return oldEntriesCount;
+  }
+
+  private MountTable getMountTableEntry(String srcPath) throws IOException {
+    List<MountTable> mountTableEntries = getMountTableEntries();
+    for (MountTable mountTable : mountTableEntries) {
+      String sourcePath = mountTable.getSourcePath();
+      if (srcPath.equals(sourcePath)) {
+        return mountTable;
+      }
+    }
+    return null;
+  }
+
+  private void addMountTableEntry(MountTableManager mountTableMgr,
+      MountTable newEntry) throws IOException {
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTableMgr.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+  }
+
+  private List<MountTable> getMountTableEntries() throws IOException {
+    return getMountTableEntries(mountTableManager);
+  }
+
+  private List<MountTable> getMountTableEntries(
+      MountTableManager mountTableManagerParam) throws IOException {
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance("/");
+    return mountTableManagerParam.getMountTableEntries(request).getEntries();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9ebaf2d/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 012d4ae..1c2c770 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -437,6 +437,7 @@ Usage:
           [-safemode enter | leave | get]
           [-nameservice disable | enable <nameservice>]
           [-getDisabledNameservices]
+          [-refresh]
 
 | COMMAND\_OPTION | Description |
 |:---- |:---- |
@@ -449,6 +450,7 @@ Usage:
 | `-safemode` `enter` `leave` `get` | Manually set the Router entering or 
leaving safe mode. The option *get* will be used for verifying if the Router is 
in safe mode state. |
 | `-nameservice` `disable` `enable` *nameservice* | Disable/enable  a name 
service from the federation. If disabled, requests will not go to that name 
service. |
 | `-getDisabledNameservices` | Get the name services that are disabled in the 
federation. |
+| `-refresh` | Update mount table cache of the connected router. |
 
 The commands for managing Router-based federation. See [Mount table 
management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management)
 for more info.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to