HDFS-13347. RBF: Cache datanode reports. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a71656c1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a71656c1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a71656c1 Branch: refs/heads/HDFS-12943 Commit: a71656c1c1bf6c680f1382a76ddcac870061f320 Parents: d1e378d Author: Yiqun Lin <yq...@apache.org> Authored: Wed Mar 28 11:00:08 2018 +0800 Committer: Yiqun Lin <yq...@apache.org> Committed: Wed Mar 28 11:00:08 2018 +0800 ---------------------------------------------------------------------- .../federation/metrics/FederationMetrics.java | 2 +- .../federation/metrics/NamenodeBeanMetrics.java | 68 +++++++++++++++++++- .../hdfs/server/federation/router/Router.java | 13 ++++ .../federation/router/RouterMetricsService.java | 9 +++ .../federation/router/RouterRpcServer.java | 16 ++--- .../server/federation/router/TestRouterRpc.java | 41 +++++++++++- 6 files changed, 135 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java index a80c3be..a99a26a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java @@ -434,7 +434,7 @@ public class FederationMetrics implements FederationMBean { try { RouterRpcServer rpcServer = this.router.getRpcServer(); DatanodeInfo[] live = rpcServer.getDatanodeReport( - DatanodeReportType.LIVE, TIME_OUT); + DatanodeReportType.LIVE, false, TIME_OUT); if (live.length > 0) { float totalDfsUsed = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index c4e5b5b..e8c6c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -25,6 +25,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -32,6 +34,7 @@ import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -39,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; @@ -56,6 +60,10 @@ import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + /** * Expose the Namenode metrics as the Router was one. */ @@ -65,6 +73,22 @@ public class NamenodeBeanMetrics private static final Logger LOG = LoggerFactory.getLogger(NamenodeBeanMetrics.class); + /** Prevent holding the page from loading too long. */ + private static final String DN_REPORT_TIME_OUT = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.time-out"; + /** We only wait for 1 second. */ + private static final long DN_REPORT_TIME_OUT_DEFAULT = + TimeUnit.SECONDS.toMillis(1); + + /** Time to cache the DN information. */ + public static final String DN_REPORT_CACHE_EXPIRE = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.cache-expire"; + /** We cache the DN information for 10 seconds by default. */ + public static final long DN_REPORT_CACHE_EXPIRE_DEFAULT = + TimeUnit.SECONDS.toMillis(10); + + + /** Instance of the Router being monitored. */ private final Router router; /** FSNamesystem bean. */ @@ -76,6 +100,11 @@ public class NamenodeBeanMetrics /** NameNodeStatus bean. */ private ObjectName nnStatusBeanName; + /** Timeout to get the DN report. */ + private final long dnReportTimeOut; + /** DN type -> full DN report in JSON. */ + private final LoadingCache<DatanodeReportType, String> dnCache; + public NamenodeBeanMetrics(Router router) { this.router = router; @@ -114,6 +143,23 @@ public class NamenodeBeanMetrics } catch (NotCompliantMBeanException e) { throw new RuntimeException("Bad NameNodeStatus MBean setup", e); } + + // Initialize the cache for the DN reports + Configuration conf = router.getConfig(); + this.dnReportTimeOut = conf.getTimeDuration( + DN_REPORT_TIME_OUT, DN_REPORT_TIME_OUT_DEFAULT, TimeUnit.MILLISECONDS); + long dnCacheExpire = conf.getTimeDuration( + DN_REPORT_CACHE_EXPIRE, + DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS); + this.dnCache = CacheBuilder.newBuilder() + .expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS) + .build( + new CacheLoader<DatanodeReportType, String>() { + @Override + public String load(DatanodeReportType type) throws Exception { + return getNodesImpl(type); + } + }); } /** @@ -299,16 +345,32 @@ public class NamenodeBeanMetrics } /** + * Get all the nodes in the federation from a particular type. Getting this + * information is expensive and we use a cache. + * @param type Type of the datanodes to check. + * @return JSON with the nodes. + */ + private String getNodes(final DatanodeReportType type) { + try { + return this.dnCache.get(type); + } catch (ExecutionException e) { + LOG.error("Cannot get the DN storage report for {}", type, e); + } + // If we cannot get the report, return empty JSON + return "{}"; + } + + /** * Get all the nodes in the federation from a particular type. - * TODO this is expensive, we may want to cache it. * @param type Type of the datanodes to check. * @return JSON with the nodes. */ - private String getNodes(DatanodeReportType type) { + private String getNodesImpl(final DatanodeReportType type) { final Map<String, Map<String, Object>> info = new HashMap<>(); try { RouterRpcServer rpcServer = this.router.getRpcServer(); - DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type); + DatanodeInfo[] datanodes = + rpcServer.getDatanodeReport(type, false, dnReportTimeOut); for (DatanodeInfo node : datanodes) { Map<String, Object> innerinfo = new HashMap<>(); innerinfo.put("infoAddr", node.getInfoAddr()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 38f5d4f..df2a448 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.RouterStore; @@ -553,6 +554,18 @@ public class Router extends CompositeService { } /** + * Get the Namenode metrics. + * + * @return Namenode metrics. + */ + public NamenodeBeanMetrics getNamenodeMetrics() { + if (this.metrics != null) { + return this.metrics.getNamenodeMetrics(); + } + return null; + } + + /** * Get the subcluster resolver for files. * * @return Subcluster resolver for files. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java index f4debce..1887ed6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java @@ -95,6 +95,15 @@ public class RouterMetricsService extends AbstractService { } /** + * Get the Namenode metrics. + * + * @return Namenode metrics. + */ + public NamenodeBeanMetrics getNamenodeMetrics() { + return this.nnMetrics; + } + + /** * Get the JVM metrics for the Router. * * @return JVM metrics. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index eaa3951..383fd77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1237,18 +1237,20 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException { checkOperation(OperationCategory.UNCHECKED); - return getDatanodeReport(type, 0); + return getDatanodeReport(type, true, 0); } /** * Get the datanode report with a timeout. * @param type Type of the datanode. + * @param requireResponse If we require all the namespaces to report. * @param timeOutMs Time out for the reply in milliseconds. * @return List of datanodes. * @throws IOException If it cannot get the report. */ public DatanodeInfo[] getDatanodeReport( - DatanodeReportType type, long timeOutMs) throws IOException { + DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { checkOperation(OperationCategory.UNCHECKED); Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>(); @@ -1257,8 +1259,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); Map<FederationNamespaceInfo, DatanodeInfo[]> results = - rpcClient.invokeConcurrent( - nss, method, true, false, timeOutMs, DatanodeInfo[].class); + rpcClient.invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry : results.entrySet()) { FederationNamespaceInfo ns = entry.getKey(); @@ -1278,9 +1280,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } // Map -> Array Collection<DatanodeInfo> datanodes = datanodesMap.values(); - DatanodeInfo[] combinedData = new DatanodeInfo[datanodes.size()]; - combinedData = datanodes.toArray(combinedData); - return combinedData; + return toArray(datanodes, DatanodeInfo.class); } @Override // ClientProtocol @@ -2295,7 +2295,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * @param clazz Class of the values. * @return Array with the values in set. */ - private static <T> T[] toArray(Set<T> set, Class<T> clazz) { + private static <T> T[] toArray(Collection<T> set, Class<T> clazz) { @SuppressWarnings("unchecked") T[] combinedData = (T[]) Array.newInstance(clazz, set.size()); combinedData = set.toArray(combinedData); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a71656c1/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index e8341a2..5014880 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -45,6 +46,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; @@ -71,10 +73,11 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; @@ -83,6 +86,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.test.GenericTestUtils; +import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -150,7 +154,14 @@ public class TestRouterRpc { cluster.startCluster(); // Start routers with only an RPC service - cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build()); + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); cluster.startRouters(); // Register and verify all NNs with all routers @@ -1032,6 +1043,32 @@ public class TestRouterRpc { assertEquals(statsNamenode.toString(), statsRouter.toString()); } + @Test + public void testNamenodeMetrics() throws Exception { + final NamenodeBeanMetrics metrics = + router.getRouter().getNamenodeMetrics(); + final String jsonString0 = metrics.getLiveNodes(); + + // We should have 12 nodes in total + JSONObject jsonObject = new JSONObject(jsonString0); + assertEquals(12, jsonObject.names().length()); + + // We should be caching this information + String jsonString1 = metrics.getLiveNodes(); + assertEquals(jsonString0, jsonString1); + + // We wait until the cached value is updated + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return !jsonString0.equals(metrics.getLiveNodes()); + } + }, 500, 5 * 1000); + + // The cache should be updated now + assertNotEquals(jsonString0, metrics.getLiveNodes()); + } + /** * Check the erasure coding policies in the Router and the Namenode. * @return The erasure coding policies. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org