sunchao commented on a change in pull request #2080:
URL: https://github.com/apache/hadoop/pull/2080#discussion_r444664594
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
##########
@@ -868,6 +904,50 @@ public HdfsLocatedFileStatus getLocatedFileInfo(String src,
return clientProto.getDatanodeReport(type);
}
+ /**
+ * Get the datanode report from cache.
+ *
+ * @param type Type of the datanode.
+ * @return List of datanodes.
+ * @throws IOException If it cannot get the report.
+ */
+ public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type)
Review comment:
nit: this can be package-private?
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
##########
@@ -1748,4 +1828,58 @@ public void refreshSuperUserGroupsConfiguration() throws
IOException {
public String[] getGroupsForUser(String user) throws IOException {
return routerProto.getGroupsForUser(user);
}
-}
\ No newline at end of file
+
+ /**
+ * Deals with loading datanode report into the cache and refresh.
+ */
+ private class DatanodeReportCacheLoader
+ extends CacheLoader<DatanodeReportType, DatanodeInfo[]> {
+
+ private ListeningExecutorService executorService;
+
+ DatanodeReportCacheLoader() {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
Review comment:
hmm can we just use:
```java
executorService = MoreExecutors.listeningDecorator(
Executors.newSingleThreadExecutor());
```
?
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
##########
@@ -361,6 +380,23 @@ public RouterRpcServer(Configuration configuration, Router
router,
this.nnProto = new RouterNamenodeProtocol(this);
this.clientProto = new RouterClientProtocol(conf, this);
this.routerProto = new RouterUserProtocol(this);
+
+ long dnCacheExpire = conf.getTimeDuration(
+ DN_REPORT_CACHE_EXPIRE,
+ DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
+ this.dnCache = CacheBuilder.newBuilder()
+ .build(new DatanodeReportCacheLoader());
+
+ // Actively refresh the dn cache in a configured interval
+ Executors
Review comment:
Hmm, have you considered using
```java
this.dnCache = CacheBuilder.newBuilder()
.refreshAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS)
.build(new DatanodeReportCacheLoader());
```
This will also automatically refresh the caches. Also it only refreshes a
key iff 1) it becomes stale, and 2) there is a request on it. So this will save
some calls for those infrequent DN report types.
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
##########
@@ -1748,4 +1828,58 @@ public void refreshSuperUserGroupsConfiguration() throws
IOException {
public String[] getGroupsForUser(String user) throws IOException {
return routerProto.getGroupsForUser(user);
}
-}
\ No newline at end of file
+
+ /**
+ * Deals with loading datanode report into the cache and refresh.
+ */
+ private class DatanodeReportCacheLoader
+ extends CacheLoader<DatanodeReportType, DatanodeInfo[]> {
+
+ private ListeningExecutorService executorService;
+
+ DatanodeReportCacheLoader() {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("DatanodeReport-Cache-Reload")
+ .setDaemon(true)
+ .build();
+
+ // Only use 1 thread to refresh cache.
+ // With coreThreadCount == maxThreadCount we effectively
+ // create a fixed size thread pool. As allowCoreThreadTimeOut
+ // has been set, all threads will die after 60 seconds of non use.
+ ThreadPoolExecutor parentExecutor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ threadFactory);
+ parentExecutor.allowCoreThreadTimeOut(true);
+ executorService = MoreExecutors.listeningDecorator(parentExecutor);
+ }
+
+ @Override
+ public DatanodeInfo[] load(DatanodeReportType type) throws Exception {
+ return getCachedDatanodeReportImpl(type);
+ }
+
+ /**
+ * Override the reload method to provide an asynchronous implementation,
+ * so that the query will not be slowed down by the cache refresh. It
+ * will return the old cache value and schedule a background refresh.
+ */
+ @Override
+ public ListenableFuture<DatanodeInfo[]> reload(
+ final DatanodeReportType type, DatanodeInfo[] oldValue)
+ throws Exception {
+ ListenableFuture<DatanodeInfo[]> listenableFuture =
Review comment:
nit: variable `listenableFuture` is redundant - you can just return from
`submit` call.
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
##########
@@ -868,6 +904,50 @@ public HdfsLocatedFileStatus getLocatedFileInfo(String src,
return clientProto.getDatanodeReport(type);
}
+ /**
+ * Get the datanode report from cache.
+ *
+ * @param type Type of the datanode.
+ * @return List of datanodes.
+ * @throws IOException If it cannot get the report.
+ */
+ public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type)
+ throws IOException {
+ try {
+ DatanodeInfo[] dns = this.dnCache.get(type);
+ if (dns == null) {
+ LOG.debug("Get null DN report from cache");
+ dns = getCachedDatanodeReportImpl(type);
+ this.dnCache.put(type, dns);
+ }
+ return dns;
+ } catch (ExecutionException e) {
+ LOG.error("Cannot get the DN report for {}", type, e);
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new IOException(cause);
+ }
+ }
+ }
+
+ private DatanodeInfo[] getCachedDatanodeReportImpl
+ (final DatanodeReportType type) throws IOException{
Review comment:
nit: space after `{`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]