HDFS-13347. RBF: Cache datanode reports. Contributed by Inigo Goiri.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a71656c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a71656c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a71656c1

Branch: refs/heads/HDFS-12943
Commit: a71656c1c1bf6c680f1382a76ddcac870061f320
Parents: d1e378d
Author: Yiqun Lin <yq...@apache.org>
Authored: Wed Mar 28 11:00:08 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Wed Mar 28 11:00:08 2018 +0800

----------------------------------------------------------------------
 .../federation/metrics/FederationMetrics.java   |  2 +-
 .../federation/metrics/NamenodeBeanMetrics.java | 68 +++++++++++++++++++-
 .../hdfs/server/federation/router/Router.java   | 13 ++++
 .../federation/router/RouterMetricsService.java |  9 +++
 .../federation/router/RouterRpcServer.java      | 16 ++---
 .../server/federation/router/TestRouterRpc.java | 41 +++++++++++-
 6 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
index a80c3be..a99a26a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
@@ -434,7 +434,7 @@ public class FederationMetrics implements FederationMBean {
     try {
       RouterRpcServer rpcServer = this.router.getRpcServer();
       DatanodeInfo[] live = rpcServer.getDatanodeReport(
-          DatanodeReportType.LIVE, TIME_OUT);
+          DatanodeReportType.LIVE, false, TIME_OUT);
 
       if (live.length > 0) {
         float totalDfsUsed = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
index c4e5b5b..e8c6c82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
@@ -25,6 +25,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -32,6 +34,7 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -39,6 +42,7 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.Router;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
@@ -56,6 +60,10 @@ import org.eclipse.jetty.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * Expose the Namenode metrics as the Router was one.
  */
@@ -65,6 +73,22 @@ public class NamenodeBeanMetrics
   private static final Logger LOG =
       LoggerFactory.getLogger(NamenodeBeanMetrics.class);
 
+  /** Prevent holding the page from loading too long. */
+  private static final String DN_REPORT_TIME_OUT =
+      RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.time-out";
+  /** We only wait for 1 second. */
+  private static final long DN_REPORT_TIME_OUT_DEFAULT =
+      TimeUnit.SECONDS.toMillis(1);
+
+  /** Time to cache the DN information. */
+  public static final String DN_REPORT_CACHE_EXPIRE =
+      RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.cache-expire";
+  /** We cache the DN information for 10 seconds by default. */
+  public static final long DN_REPORT_CACHE_EXPIRE_DEFAULT =
+      TimeUnit.SECONDS.toMillis(10);
+
+
+  /** Instance of the Router being monitored. */
   private final Router router;
 
   /** FSNamesystem bean. */
@@ -76,6 +100,11 @@ public class NamenodeBeanMetrics
   /** NameNodeStatus bean. */
   private ObjectName nnStatusBeanName;
 
+  /** Timeout to get the DN report. */
+  private final long dnReportTimeOut;
+  /** DN type -> full DN report in JSON. */
+  private final LoadingCache<DatanodeReportType, String> dnCache;
+
 
   public NamenodeBeanMetrics(Router router) {
     this.router = router;
@@ -114,6 +143,23 @@ public class NamenodeBeanMetrics
     } catch (NotCompliantMBeanException e) {
       throw new RuntimeException("Bad NameNodeStatus MBean setup", e);
     }
+
+    // Initialize the cache for the DN reports
+    Configuration conf = router.getConfig();
+    this.dnReportTimeOut = conf.getTimeDuration(
+        DN_REPORT_TIME_OUT, DN_REPORT_TIME_OUT_DEFAULT, TimeUnit.MILLISECONDS);
+    long dnCacheExpire = conf.getTimeDuration(
+        DN_REPORT_CACHE_EXPIRE,
+        DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS);
+    this.dnCache = CacheBuilder.newBuilder()
+        .expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS)
+        .build(
+            new CacheLoader<DatanodeReportType, String>() {
+              @Override
+              public String load(DatanodeReportType type) throws Exception {
+                return getNodesImpl(type);
+              }
+            });
   }
 
   /**
@@ -299,16 +345,32 @@ public class NamenodeBeanMetrics
   }
 
   /**
+   * Get all the nodes in the federation from a particular type. Getting this
+   * information is expensive and we use a cache.
+   * @param type Type of the datanodes to check.
+   * @return JSON with the nodes.
+   */
+  private String getNodes(final DatanodeReportType type) {
+    try {
+      return this.dnCache.get(type);
+    } catch (ExecutionException e) {
+      LOG.error("Cannot get the DN storage report for {}", type, e);
+    }
+    // If we cannot get the report, return empty JSON
+    return "{}";
+  }
+
+  /**
    * Get all the nodes in the federation from a particular type.
-   * TODO this is expensive, we may want to cache it.
    * @param type Type of the datanodes to check.
    * @return JSON with the nodes.
    */
-  private String getNodes(DatanodeReportType type) {
+  private String getNodesImpl(final DatanodeReportType type) {
     final Map<String, Map<String, Object>> info = new HashMap<>();
     try {
       RouterRpcServer rpcServer = this.router.getRpcServer();
-      DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type);
+      DatanodeInfo[] datanodes =
+          rpcServer.getDatanodeReport(type, false, dnReportTimeOut);
       for (DatanodeInfo node : datanodes) {
         Map<String, Object> innerinfo = new HashMap<>();
         innerinfo.put("infoAddr", node.getInfoAddr());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 38f5d4f..df2a448 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
@@ -553,6 +554,18 @@ public class Router extends CompositeService {
   }
 
   /**
+   * Get the Namenode metrics.
+   *
+   * @return Namenode metrics.
+   */
+  public NamenodeBeanMetrics getNamenodeMetrics() {
+    if (this.metrics != null) {
+      return this.metrics.getNamenodeMetrics();
+    }
+    return null;
+  }
+
+  /**
    * Get the subcluster resolver for files.
    *
    * @return Subcluster resolver for files.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
index f4debce..1887ed6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
@@ -95,6 +95,15 @@ public class RouterMetricsService extends AbstractService {
   }
 
   /**
+   * Get the Namenode metrics.
+   *
+   * @return Namenode metrics.
+   */
+  public NamenodeBeanMetrics getNamenodeMetrics() {
+    return this.nnMetrics;
+  }
+
+  /**
    * Get the JVM metrics for the Router.
    *
    * @return JVM metrics.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index eaa3951..383fd77 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1237,18 +1237,20 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
       throws IOException {
     checkOperation(OperationCategory.UNCHECKED);
-    return getDatanodeReport(type, 0);
+    return getDatanodeReport(type, true, 0);
   }
 
   /**
    * Get the datanode report with a timeout.
    * @param type Type of the datanode.
+   * @param requireResponse If we require all the namespaces to report.
    * @param timeOutMs Time out for the reply in milliseconds.
    * @return List of datanodes.
    * @throws IOException If it cannot get the report.
    */
   public DatanodeInfo[] getDatanodeReport(
-      DatanodeReportType type, long timeOutMs) throws IOException {
+      DatanodeReportType type, boolean requireResponse, long timeOutMs)
+          throws IOException {
     checkOperation(OperationCategory.UNCHECKED);
 
     Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
@@ -1257,8 +1259,8 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
 
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
     Map<FederationNamespaceInfo, DatanodeInfo[]> results =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, timeOutMs, DatanodeInfo[].class);
+        rpcClient.invokeConcurrent(nss, method, requireResponse, false,
+            timeOutMs, DatanodeInfo[].class);
     for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
         results.entrySet()) {
       FederationNamespaceInfo ns = entry.getKey();
@@ -1278,9 +1280,7 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
     }
     // Map -> Array
     Collection<DatanodeInfo> datanodes = datanodesMap.values();
-    DatanodeInfo[] combinedData = new DatanodeInfo[datanodes.size()];
-    combinedData = datanodes.toArray(combinedData);
-    return combinedData;
+    return toArray(datanodes, DatanodeInfo.class);
   }
 
   @Override // ClientProtocol
@@ -2295,7 +2295,7 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
    * @param clazz Class of the values.
    * @return Array with the values in set.
    */
-  private static <T> T[] toArray(Set<T> set, Class<T> clazz) {
+  private static <T> T[] toArray(Collection<T> set, Class<T> clazz) {
     @SuppressWarnings("unchecked")
     T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
     combinedData = set.toArray(combinedData);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index e8341a2..5014880 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -27,6 +27,7 @@ import static 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -45,6 +46,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
@@ -71,10 +73,11 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -83,6 +86,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.codehaus.jettison.json.JSONObject;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -150,7 +154,14 @@ public class TestRouterRpc {
     cluster.startCluster();
 
     // Start routers with only an RPC service
-    cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build());
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
     cluster.startRouters();
 
     // Register and verify all NNs with all routers
@@ -1032,6 +1043,32 @@ public class TestRouterRpc {
     assertEquals(statsNamenode.toString(), statsRouter.toString());
   }
 
+  @Test
+  public void testNamenodeMetrics() throws Exception {
+    final NamenodeBeanMetrics metrics =
+        router.getRouter().getNamenodeMetrics();
+    final String jsonString0 = metrics.getLiveNodes();
+
+    // We should have 12 nodes in total
+    JSONObject jsonObject = new JSONObject(jsonString0);
+    assertEquals(12, jsonObject.names().length());
+
+    // We should be caching this information
+    String jsonString1 = metrics.getLiveNodes();
+    assertEquals(jsonString0, jsonString1);
+
+    // We wait until the cached value is updated
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !jsonString0.equals(metrics.getLiveNodes());
+      }
+    }, 500, 5 * 1000);
+
+    // The cache should be updated now
+    assertNotEquals(jsonString0, metrics.getLiveNodes());
+  }
+
   /**
    * Check the erasure coding policies in the Router and the Namenode.
    * @return The erasure coding policies.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to