[ 
https://issues.apache.org/jira/browse/HDFS-17601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888343#comment-17888343
 ] 

ASF GitHub Bot commented on HDFS-17601:
---------------------------------------

KeeProMise commented on code in PR #7108:
URL: https://github.com/apache/hadoop/pull/7108#discussion_r1795729948


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -824,6 +874,60 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, 
IOException ioe,
     throw ioe;
   }
 
+  /**
+   * Invoke the method sequentially on available namespaces,
+   * throw no namespace available exception, if no namespaces are available.
+   * Asynchronous version of invokeOnNs method.
+   * @param method the remote method.
+   * @param clazz  Class for the return type.
+   * @param ioe    IOException .
+   * @param nss    List of name spaces in the federation
+   * @return the response received after invoking method.
+   * @throws IOException
+   */
+  <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
+      Set<FederationNamespaceInfo> nss) throws IOException {
+    if (nss.isEmpty()) {
+      throw ioe;
+    }
+
+    asyncComplete(null);
+    Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
+    asyncForEach(nsIterator, (foreach, fnInfo) -> {
+      String nsId = fnInfo.getNameserviceId();
+      LOG.debug("Invoking {} on namespace {}", method, nsId);
+      asyncTry(() -> {
+        rpcClient.invokeSingle(nsId, method, clazz);
+        asyncApply(result -> {
+          if (result != null && isExpectedClass(clazz, result)) {
+            foreach.breakNow();
+            return result;
+          }
+          return null;
+        });
+      });
+
+      asyncCatch((AsyncCatchFunction<T, IOException>)(ret, ex) -> {

Review Comment:
   Should use CatchFunction.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -791,6 +801,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> 
clazz)
     return invokeOnNs(method, clazz, io, nss);
   }
 
+  /**
+   * Invokes the method at default namespace, if default namespace is not
+   * available then at the other available namespaces.
+   * If the namespace is unavailable, retry with other namespaces.
+   * Asynchronous version of invokeAtAvailableNs method.
+   * @param <T> expected return type.
+   * @param method the remote method.
+   * @return the response received after invoking method.
+   * @throws IOException
+   */
+  <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+      throws IOException {
+    String nsId = subclusterResolver.getDefaultNamespace();
+    // If default Ns is not present return result from first namespace.
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    // If no namespace is available, throw IOException.
+    IOException io = new IOException("No namespace available.");
+
+    asyncComplete(null);
+    if (!nsId.isEmpty()) {
+      asyncTry(() -> {
+        rpcClient.invokeSingle(nsId, method, clazz);
+      });
+
+      asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
+        if (!clientProto.isUnavailableSubclusterException(ioe)) {
+          LOG.debug("{} exception cannot be retried",
+              ioe.getClass().getSimpleName());
+          throw ioe;
+        }
+        nss.removeIf(n -> n.getNameserviceId().equals(nsId));
+        invokeOnNs(method, clazz, io, nss);

Review Comment:
   Hi @hfutatzhanghb should use invokeOnNsAsync.





> [ARR] RouterRpcServer supports asynchronous rpc.
> ------------------------------------------------
>
>                 Key: HDFS-17601
>                 URL: https://issues.apache.org/jira/browse/HDFS-17601
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: rbf
>            Reporter: farmmamba
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> RouterRpcServer supports asynchronous RPC, the following methods need to be 
> transformed to asynchronous versions:
>  * {{{color:#172b4d}invokeOnNsAsync{color}}}
>  * {{{color:#172b4d}invokeAtAvailableNsAsync {color}}}
>  * {{{color:#172b4d}getExistingLocationAsync{color}}}
>  * {{{color:#172b4d}getDatanodeReportAsync{color}}}
>  * {{{color:#172b4d}getDatanodeStorageReportMapAsync{color}}}
>  * {{{color:#172b4d}getSlowDatanodeReportAsync{color}}}
>  * {{{color:#172b4d}getCreateLocationAsync{color}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to