[ 
https://issues.apache.org/jira/browse/HDFS-17596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17887743#comment-17887743
 ] 

ASF GitHub Bot commented on HDFS-17596:
---------------------------------------

KeeProMise commented on code in PR #6988:
URL: https://github.com/apache/hadoop/pull/6988#discussion_r1792697438


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -791,6 +800,36 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> 
clazz)
     return invokeOnNs(method, clazz, io, nss);
   }
 
+  <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+      throws IOException {
+    String nsId = subclusterResolver.getDefaultNamespace();
+    // If default Ns is not present return result from first namespace.
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    // If no namespace is available, throw IOException.
+    IOException io = new IOException("No namespace available.");
+
+    asyncComplete(null);
+    if (!nsId.isEmpty()) {
+      asyncTry(() -> {
+        rpcClient.invokeSingle(nsId, method, clazz);
+      });
+
+      asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
+        if (!clientProto.isUnavailableSubclusterException(ioe)) {
+          LOG.debug("{} exception cannot be retried",
+              ioe.getClass().getSimpleName());
+          throw ioe;
+        }
+        nss.removeIf(n -> n.getNameserviceId().equals(nsId));
+        invokeOnNs(method, clazz, io, nss);

Review Comment:
   Should use  invokeOnNsAsync.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -791,6 +800,36 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> 
clazz)
     return invokeOnNs(method, clazz, io, nss);
   }
 
+  <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+      throws IOException {
+    String nsId = subclusterResolver.getDefaultNamespace();
+    // If default Ns is not present return result from first namespace.
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    // If no namespace is available, throw IOException.
+    IOException io = new IOException("No namespace available.");
+
+    asyncComplete(null);

Review Comment:
   Hi, @hfutatzhanghb IMO, asyncComplete(null) is not needed in this place.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java:
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;

Review Comment:
   Remove unused imports.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -824,6 +863,49 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, 
IOException ioe,
     throw ioe;
   }
 
+  <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
+                        Set<FederationNamespaceInfo> nss) throws IOException {
+    if (nss.isEmpty()) {
+      throw ioe;
+    }
+
+    asyncComplete(null);
+    Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
+    asyncForEach(nsIterator, (foreach, fnInfo) -> {
+      String nsId = fnInfo.getNameserviceId();
+      LOG.debug("Invoking {} on namespace {}", method, nsId);
+      asyncTry(() -> {
+        rpcClient.invokeSingle(nsId, method, clazz);
+        asyncApply(result -> {
+          if (result != null && isExpectedClass(clazz, result)) {
+            foreach.breakNow();
+            return result;
+          }
+          return null;
+        });
+      });
+
+      asyncCatch((AsyncCatchFunction<T, IOException>)(ret, ex) -> {

Review Comment:
   Should use CatchFunction.





> [ARR] RouterStoragePolicy supports asynchronous rpc.
> ----------------------------------------------------
>
>                 Key: HDFS-17596
>                 URL: https://issues.apache.org/jira/browse/HDFS-17596
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: Jian Zhang
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> *Describe*
> The main new addition is RouterAsyncStoragePolicy, which extends 
> RouterStoragePolicy so that supports asynchronous rpc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to