Repository: hadoop
Updated Branches:
  refs/heads/branch-3.1 9a86cb049 -> 5eb32396d


HDFS-13384. RBF: Improve timeout RPC call mechanism. Contributed by Inigo Goiri.

(cherry picked from commit e87be8a2a49573897e40bfdf43541e3635e35c98)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5eb32396
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5eb32396
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5eb32396

Branch: refs/heads/branch-3.1
Commit: 5eb32396d785bbabfd98199e5d1ed7dcb04a9895
Parents: 9a86cb0
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Apr 10 15:34:42 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Apr 10 15:36:48 2018 +0800

----------------------------------------------------------------------
 .../federation/metrics/NamenodeBeanMetrics.java |   3 +
 .../federation/router/RouterRpcClient.java      |   2 +-
 .../router/SubClusterTimeoutException.java      |  33 +++++
 .../server/federation/MiniRouterDFSCluster.java |  31 ++++-
 .../router/TestRouterRPCClientRetries.java      | 126 ++++++++++++++++++-
 5 files changed, 192 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eb32396/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
index e8c6c82..4d22ae7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
@@ -45,6 +45,7 @@ import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.Router;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import 
org.apache.hadoop.hdfs.server.federation.router.SubClusterTimeoutException;
 import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
@@ -396,6 +397,8 @@ public class NamenodeBeanMetrics
       }
     } catch (StandbyException e) {
       LOG.error("Cannot get {} nodes, Router in safe mode", type);
+    } catch (SubClusterTimeoutException e) {
+      LOG.error("Cannot get {} nodes, subclusters timed out responding", type);
     } catch (IOException e) {
       LOG.error("Cannot get " + type + " nodes", e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eb32396/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 4723b4c..e2c9cb4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -1007,7 +1007,7 @@ public class RouterRpcClient {
           String msg =
               "Invocation to \"" + loc + "\" for \"" + method + "\" timed out";
           LOG.error(msg);
-          IOException ioe = new IOException(msg);
+          IOException ioe = new SubClusterTimeoutException(msg);
           exceptions.put(location, ioe);
         } catch (ExecutionException ex) {
           Throwable cause = ex.getCause();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eb32396/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java
new file mode 100644
index 0000000..dac5bd6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+
+
+/**
+ * Exception when timing out waiting for the reply of a subcluster.
+ */
+public class SubClusterTimeoutException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public SubClusterTimeoutException(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eb32396/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
index 0ad8536..0a4de33 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
@@ -132,6 +133,9 @@ public class MiniRouterDFSCluster {
   /** Namenode configuration overrides. */
   private Configuration namenodeOverrides;
 
+  /** If the DNs are shared. */
+  private boolean sharedDNs = true;
+
 
   /**
    * Router context.
@@ -558,6 +562,13 @@ public class MiniRouterDFSCluster {
     this.numDatanodesPerNameservice = num;
   }
 
+  /**
+   * Set the DNs to belong to only one subcluster.
+   */
+  public void setIndependentDNs() {
+    this.sharedDNs = false;
+  }
+
   public String getNameservicesKey() {
     StringBuilder sb = new StringBuilder();
     for (String nsId : this.nameservices) {
@@ -677,15 +688,33 @@ public class MiniRouterDFSCluster {
       }
       topology.setFederation(true);
 
+      // Set independent DNs across subclusters
+      int numDNs = nameservices.size() * numDatanodesPerNameservice;
+      Configuration[] dnConfs = null;
+      if (!sharedDNs) {
+        dnConfs = new Configuration[numDNs];
+        int dnId = 0;
+        for (String nsId : nameservices) {
+          Configuration subclusterConf = new Configuration();
+          subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
+          for (int i = 0; i < numDatanodesPerNameservice; i++) {
+            dnConfs[dnId] = subclusterConf;
+            dnId++;
+          }
+        }
+      }
+
       // Start mini DFS cluster
       String ns0 = nameservices.get(0);
       Configuration nnConf = generateNamenodeConfiguration(ns0);
       if (overrideConf != null) {
         nnConf.addResource(overrideConf);
       }
+
       cluster = new MiniDFSCluster.Builder(nnConf)
-          .numDataNodes(nameservices.size() * numDatanodesPerNameservice)
+          .numDataNodes(numDNs)
           .nnTopology(topology)
+          .dataNodeConfOverlays(dnConfs)
           .build();
       cluster.waitActive();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eb32396/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
index 1e0f9a1..91dc2e7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -18,11 +18,16 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -30,40 +35,65 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
 
 /**
  * Test retry behavior of the Router RPC Client.
  */
 public class TestRouterRPCClientRetries {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
+
   private static StateStoreDFSCluster cluster;
   private static NamenodeContext nnContext1;
   private static RouterContext routerContext;
   private static MembershipNamenodeResolver resolver;
   private static ClientProtocol routerProtocol;
 
+  @Rule
+  public final Timeout testTimeout = new Timeout(100000);
+
   @Before
   public void setUp() throws Exception {
     // Build and start a federated cluster
     cluster = new StateStoreDFSCluster(false, 2);
     Configuration routerConf = new RouterConfigBuilder()
         .stateStore()
+        .metrics()
         .admin()
         .rpc()
         .build();
+    routerConf.setTimeDuration(
+        NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
 
     // reduce IPC client connection retry times and interval time
     Configuration clientConf = new Configuration(false);
@@ -72,6 +102,9 @@ public class TestRouterRPCClientRetries {
     clientConf.setInt(
         CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100);
 
+    // Set the DNs to belong to only one subcluster
+    cluster.setIndependentDNs();
+
     cluster.addRouterOverrides(routerConf);
     // override some settings for the client
     cluster.startCluster(clientConf);
@@ -157,4 +190,95 @@ public class TestRouterRPCClientRetries {
     assertTrue(resolver.registerNamenode(report));
     resolver.loadCache(true);
   }
+
+  @Test
+  public void testNamenodeMetricsSlow() throws Exception {
+    final Router router = routerContext.getRouter();
+    final NamenodeBeanMetrics metrics = router.getNamenodeMetrics();
+
+    // Initially, there are 4 DNs in total
+    final String jsonString0 = metrics.getLiveNodes();
+    assertEquals(4, getNumDatanodes(jsonString0));
+
+    // The response should be cached
+    assertEquals(jsonString0, metrics.getLiveNodes());
+
+    // Check that the cached value gets updated eventually
+    waitUpdateLiveNodes(jsonString0, metrics);
+    final String jsonString2 = metrics.getLiveNodes();
+    assertNotEquals(jsonString0, jsonString2);
+    assertEquals(4, getNumDatanodes(jsonString2));
+
+    // Making subcluster0 slow to reply, should only get DNs from nn1
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    NameNode nn0 = dfsCluster.getNameNode(0);
+    simulateNNSlow(nn0);
+    waitUpdateLiveNodes(jsonString2, metrics);
+    final String jsonString3 = metrics.getLiveNodes();
+    assertEquals(2, getNumDatanodes(jsonString3));
+
+    // Making subcluster1 slow to reply, shouldn't get any DNs
+    NameNode nn1 = dfsCluster.getNameNode(1);
+    simulateNNSlow(nn1);
+    waitUpdateLiveNodes(jsonString3, metrics);
+    final String jsonString4 = metrics.getLiveNodes();
+    assertEquals(0, getNumDatanodes(jsonString4));
+  }
+
+  /**
+   * Get the number of nodes in a JSON string.
+   * @param jsonString JSON string containing nodes.
+   * @return Number of nodes.
+   * @throws JSONException If the JSON string is not properly formed.
+   */
+  private static int getNumDatanodes(final String jsonString)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject(jsonString);
+    if (jsonObject.length() == 0) {
+      return 0;
+    }
+    return jsonObject.names().length();
+  }
+
+  /**
+   * Wait until the cached live nodes value is updated.
+   * @param oldValue Old cached value.
+   * @param metrics Namenode metrics beans to get the live nodes from.
+   * @throws Exception If it cannot wait.
+   */
+  private static void waitUpdateLiveNodes(
+      final String oldValue, final NamenodeBeanMetrics metrics)
+          throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !oldValue.equals(metrics.getLiveNodes());
+      }
+    }, 500, 5 * 1000);
+  }
+
+  /**
+   * Simulate that a Namenode is slow by adding a sleep to the check operation
+   * in the NN.
+   * @param nn Namenode to simulate slow.
+   * @throws Exception If we cannot add the sleep time.
+   */
+  private static void simulateNNSlow(final NameNode nn) throws Exception {
+    FSNamesystem namesystem = nn.getNamesystem();
+    HAContext haContext = namesystem.getHAContext();
+    HAContext spyHAContext = spy(haContext);
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        LOG.info("Simulating slow namenode {}", invocation.getMock());
+        try {
+          Thread.sleep(3 * 1000);
+        } catch(InterruptedException e) {
+          LOG.error("Simulating a slow namenode aborted");
+        }
+        return null;
+      }
+    }).when(spyHAContext).checkOperation(any(OperationCategory.class));
+    Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to