Repository: hadoop
Updated Branches:
  refs/heads/trunk 1d37cf675 -> 8896d20b9


HDFS-13119. RBF: Manage unavailable clusters. Contributed by Yiqun Lin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8896d20b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8896d20b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8896d20b

Branch: refs/heads/trunk
Commit: 8896d20b91520053a6bbfb680adb345cd24f4142
Parents: 1d37cf6
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Feb 20 09:37:08 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Feb 20 09:37:08 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +
 .../federation/metrics/FederationRPCMBean.java  |   2 +
 .../metrics/FederationRPCMetrics.java           |  11 ++
 .../FederationRPCPerformanceMonitor.java        |  10 ++
 .../resolver/NamenodeStatusReport.java          |   8 +
 .../federation/router/RouterRpcClient.java      |  71 +++++++--
 .../federation/router/RouterRpcMonitor.java     |  13 ++
 .../federation/router/RouterRpcServer.java      |   9 ++
 .../src/main/resources/hdfs-default.xml         |  17 +++
 .../router/TestRouterRPCClientRetries.java      | 151 +++++++++++++++++++
 10 files changed, 289 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0828957..bea38d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1246,6 +1246,14 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
       TimeUnit.SECONDS.toMillis(10);
 
+  // HDFS Router RPC client
+  public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
+      FEDERATION_ROUTER_PREFIX + "client.thread-size";
+  public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32;
+  public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
+      FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
+  public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
+
   // HDFS Router State Store connection
   public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
       FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 00209e9..3e031fe 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -42,6 +42,8 @@ public interface FederationRPCMBean {
 
   long getProxyOpNotImplemented();
 
+  long getProxyOpRetries();
+
   long getRouterFailureStateStoreOps();
 
   long getRouterFailureReadOnlyOps();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 8995689..94d3383 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -56,6 +56,8 @@ public class FederationRPCMetrics implements 
FederationRPCMBean {
   private MutableCounterLong proxyOpFailureCommunicate;
   @Metric("Number of operations not implemented")
   private MutableCounterLong proxyOpNotImplemented;
+  @Metric("Number of operation retries")
+  private MutableCounterLong proxyOpRetries;
 
   @Metric("Failed requests due to State Store unavailable")
   private MutableCounterLong routerFailureStateStore;
@@ -126,6 +128,15 @@ public class FederationRPCMetrics implements 
FederationRPCMBean {
     return proxyOpNotImplemented.value();
   }
 
+  public void incrProxyOpRetries() {
+    proxyOpRetries.incr();
+  }
+
+  @Override
+  public long getProxyOpRetries() {
+    return proxyOpRetries.value();
+  }
+
   public void incrRouterFailureStateStore() {
     routerFailureStateStore.incr();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
index e3a16b5..547ebb5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -159,6 +159,11 @@ public class FederationRPCPerformanceMonitor implements 
RouterRpcMonitor {
   }
 
   @Override
+  public void proxyOpRetries() {
+    metrics.incrProxyOpRetries();
+  }
+
+  @Override
   public void routerFailureStateStore() {
     metrics.incrRouterFailureStateStore();
   }
@@ -208,4 +213,9 @@ public class FederationRPCPerformanceMonitor implements 
RouterRpcMonitor {
     }
     return -1;
   }
+
+  @Override
+  public FederationRPCMetrics getRPCMetrics() {
+    return this.metrics;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
index d3c6d87..c3c6fa8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
@@ -390,6 +390,14 @@ public class NamenodeStatusReport {
     return this.numOfBlocksPendingDeletion;
   }
 
+  /**
+   * Set the validity of registration.
+   * @param isValid The desired value to be set.
+   */
+  public void setRegistrationValid(boolean isValid) {
+    this.registrationValid = isValid;
+  }
+
   @Override
   public String toString() {
     return String.format("%s-%s:%s",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 4209a49..d3b7947 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -46,12 +46,14 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -123,10 +125,14 @@ public class RouterRpcClient {
     this.connectionManager = new ConnectionManager(conf);
     this.connectionManager.start();
 
+    int numThreads = conf.getInt(
+        DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
+        DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat("RPC Router Client-%d")
         .build();
-    this.executorService = Executors.newCachedThreadPool(threadFactory);
+    this.executorService = Executors.newFixedThreadPool(
+        numThreads, threadFactory);
 
     this.rpcMonitor = monitor;
 
@@ -134,8 +140,8 @@ public class RouterRpcClient {
         HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
         HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
     int maxRetryAttempts = conf.getInt(
-        HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
-        HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
+        DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
+        DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
     int failoverSleepBaseMillis = conf.getInt(
         HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
         HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
@@ -274,11 +280,24 @@ public class RouterRpcClient {
    *
    * @param ioe IOException reported.
    * @param retryCount Number of retries.
+   * @param nsId Nameservice ID.
    * @return Retry decision.
-   * @throws IOException Original exception if the retry policy generates one.
+   * @throws IOException Original exception if the retry policy generates one
+   *                     or IOException for no available namenodes.
    */
-  private RetryDecision shouldRetry(final IOException ioe, final int 
retryCount)
-      throws IOException {
+  private RetryDecision shouldRetry(final IOException ioe, final int 
retryCount,
+      final String nsId) throws IOException {
+    // check for the case of cluster unavailable state
+    if (isClusterUnAvailable(nsId)) {
+      // we allow to retry once if cluster is unavailable
+      if (retryCount == 0) {
+        return RetryDecision.RETRY;
+      } else {
+        throw new IOException("No namenode available under nameservice " + 
nsId,
+            ioe);
+      }
+    }
+
     try {
       final RetryPolicy.RetryAction a =
           this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
@@ -329,7 +348,7 @@ public class RouterRpcClient {
         connection = this.getConnection(ugi, nsId, rpcAddress);
         ProxyAndInfo<ClientProtocol> client = connection.getClient();
         ClientProtocol proxy = client.getProxy();
-        ret = invoke(0, method, proxy, params);
+        ret = invoke(nsId, 0, method, proxy, params);
         if (failover) {
           // Success on alternate server, update
           InetSocketAddress address = client.getAddress();
@@ -400,6 +419,8 @@ public class RouterRpcClient {
    * Re-throws exceptions generated by the remote RPC call as either
    * RemoteException or IOException.
    *
+   * @param nsId Identifier for the namespace
+   * @param retryCount Current retry times
    * @param method Method to invoke
    * @param obj Target object for the method
    * @param params Variable parameters
@@ -407,8 +428,8 @@ public class RouterRpcClient {
    * @throws IOException
    * @throws InterruptedException
    */
-  private Object invoke(int retryCount, final Method method, final Object obj,
-      final Object... params) throws IOException {
+  private Object invoke(String nsId, int retryCount, final Method method,
+      final Object obj, final Object... params) throws IOException {
     try {
       return method.invoke(obj, params);
     } catch (IllegalAccessException e) {
@@ -421,11 +442,16 @@ public class RouterRpcClient {
       Throwable cause = e.getCause();
       if (cause instanceof IOException) {
         IOException ioe = (IOException) cause;
+
         // Check if we should retry.
-        RetryDecision decision = shouldRetry(ioe, retryCount);
+        RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
         if (decision == RetryDecision.RETRY) {
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpRetries();
+          }
+
           // retry
-          return invoke(++retryCount, method, obj, params);
+          return invoke(nsId, ++retryCount, method, obj, params);
         } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
           // failover, invoker looks for standby exceptions for failover.
           if (ioe instanceof StandbyException) {
@@ -448,6 +474,29 @@ public class RouterRpcClient {
   }
 
   /**
+   * Check if the cluster of given nameservice id is available.
+   * @param nsId nameservice ID.
+   * @return
+   * @throws IOException
+   */
+  private boolean isClusterUnAvailable(String nsId) throws IOException {
+    List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
+        .getNamenodesForNameserviceId(nsId);
+
+    if (nnState != null) {
+      for (FederationNamenodeContext nnContext : nnState) {
+        // Once we find one NN is in active state, we assume this
+        // cluster is available.
+        if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
    * Get a clean copy of the exception. Sometimes the exceptions returned by 
the
    * server contain the full stack trace in the message.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
index d889a56..df9aa11 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 
 /**
@@ -36,6 +37,12 @@ public interface RouterRpcMonitor {
       Configuration conf, RouterRpcServer server, StateStoreService store);
 
   /**
+   * Get Router RPC metrics info.
+   * @return The instance of FederationRPCMetrics.
+   */
+  FederationRPCMetrics getRPCMetrics();
+
+  /**
    * Close the monitor.
    */
   void close();
@@ -74,6 +81,12 @@ public interface RouterRpcMonitor {
   void proxyOpNotImplemented();
 
   /**
+   * Retry to proxy an operation to a Namenode because of an unexpected
+   * exception.
+   */
+  void proxyOpRetries();
+
+  /**
    * If the Router cannot contact the State Store in an operation.
    */
   void routerFailureStateStore();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 57125ca..e0dfeb4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -105,6 +105,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -2177,4 +2178,12 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
   public Quota getQuotaModule() {
     return this.quotaCall;
   }
+
+  /**
+   * Get RPC metrics info.
+   * @return The instance of FederationRPCMetrics.
+   */
+  public FederationRPCMetrics getRPCMetrics() {
+    return this.rpcMonitor.getRPCMetrics();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index b61c418..d037b2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5234,4 +5234,21 @@
       is assumed.
     </description>
   </property>
+
+  <property>
+    <name>dfs.federation.router.client.thread-size</name>
+    <value>32</value>
+    <description>
+      Max threads size for the RouterClient to execute concurrent
+      requests.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.client.retry.max.attempts</name>
+    <value>3</value>
+    <description>
+      Max retry attempts for the RouterClient talking to the Router.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
new file mode 100644
index 0000000..dddcb5a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test retry behavior of the Router RPC Client.
+ */
+public class TestRouterRPCClientRetries {
+
+  private static StateStoreDFSCluster cluster;
+  private static NamenodeContext nnContext1;
+  private static RouterContext routerContext;
+  private static MembershipNamenodeResolver resolver;
+  private static ClientProtocol routerProtocol;
+
+  @Before
+  public void setUp() throws Exception {
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 2);
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
+    routerContext = cluster.getRandomRouter();
+    resolver = (MembershipNamenodeResolver) routerContext.getRouter()
+        .getNamenodeResolver();
+    routerProtocol = routerContext.getClient().getNamenode();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testRetryWhenAllNameServiceDown() throws Exception {
+    // shutdown the dfs cluster
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    dfsCluster.shutdown();
+
+    // register an invalid namenode report
+    registerInvalidNameReport();
+
+    // Create a directory via the router
+    String dirPath = "/testRetryWhenClusterisDown";
+    FsPermission permission = new FsPermission("705");
+    try {
+      routerProtocol.mkdirs(dirPath, permission, false);
+      fail("Should have thrown RemoteException error.");
+    } catch (RemoteException e) {
+      String ns0 = cluster.getNameservices().get(0);
+      GenericTestUtils.assertExceptionContains(
+          "No namenode available under nameservice " + ns0, e);
+    }
+
+    // Verify the retry times, it should only retry one time.
+    FederationRPCMetrics rpcMetrics = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics();
+    assertEquals(1, rpcMetrics.getProxyOpRetries());
+  }
+
+  @Test
+  public void testRetryWhenOneNameServiceDown() throws Exception {
+    // shutdown the dfs cluster
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    dfsCluster.shutdownNameNode(0);
+
+    // register an invalid namenode report
+    registerInvalidNameReport();
+
+    DFSClient client = nnContext1.getClient();
+    // Renew lease for the DFS client, it will succeed.
+    routerProtocol.renewLease(client.getClientName());
+
+    // Verify the retry times, it will retry one time for ns0.
+    FederationRPCMetrics rpcMetrics = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics();
+    assertEquals(1, rpcMetrics.getProxyOpRetries());
+  }
+
+  /**
+   * Register an invalid namenode report.
+   * @throws IOException
+   */
+  private void registerInvalidNameReport() throws IOException {
+    String ns0 = cluster.getNameservices().get(0);
+    List<? extends FederationNamenodeContext> origin = resolver
+        .getNamenodesForNameserviceId(ns0);
+    FederationNamenodeContext nnInfo = origin.get(0);
+    NamenodeStatusReport report = new NamenodeStatusReport(ns0,
+        nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
+        nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
+        nnInfo.getWebAddress());
+    report.setRegistrationValid(false);
+    assertTrue(resolver.registerNamenode(report));
+    resolver.loadCache(true);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to