[ https://issues.apache.org/jira/browse/HDFS-17596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17887743#comment-17887743 ]
ASF GitHub Bot commented on HDFS-17596: --------------------------------------- KeeProMise commented on code in PR #6988: URL: https://github.com/apache/hadoop/pull/6988#discussion_r1792697438 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java: ########## @@ -791,6 +800,36 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz) return invokeOnNs(method, clazz, io, nss); } + <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz) + throws IOException { + String nsId = subclusterResolver.getDefaultNamespace(); + // If default Ns is not present return result from first namespace. + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + // If no namespace is available, throw IOException. + IOException io = new IOException("No namespace available."); + + asyncComplete(null); + if (!nsId.isEmpty()) { + asyncTry(() -> { + rpcClient.invokeSingle(nsId, method, clazz); + }); + + asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> { + if (!clientProto.isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + nss.removeIf(n -> n.getNameserviceId().equals(nsId)); + invokeOnNs(method, clazz, io, nss); Review Comment: Should use invokeOnNsAsync. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java: ########## @@ -791,6 +800,36 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz) return invokeOnNs(method, clazz, io, nss); } + <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz) + throws IOException { + String nsId = subclusterResolver.getDefaultNamespace(); + // If default Ns is not present return result from first namespace. + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + // If no namespace is available, throw IOException. + IOException io = new IOException("No namespace available."); + + asyncComplete(null); Review Comment: Hi, @hfutatzhanghb IMO, asyncComplete(null) is not needed in this place. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java: ########## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; Review Comment: Remove unused imports. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java: ########## @@ -824,6 +863,49 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe, throw ioe; } + <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe, + Set<FederationNamespaceInfo> nss) throws IOException { + if (nss.isEmpty()) { + throw ioe; + } + + asyncComplete(null); + Iterator<FederationNamespaceInfo> nsIterator = nss.iterator(); + asyncForEach(nsIterator, (foreach, fnInfo) -> { + String nsId = fnInfo.getNameserviceId(); + LOG.debug("Invoking {} on namespace {}", method, nsId); + asyncTry(() -> { + rpcClient.invokeSingle(nsId, method, clazz); + asyncApply(result -> { + if (result != null && isExpectedClass(clazz, result)) { + foreach.breakNow(); + return result; + } + return null; + }); + }); + + asyncCatch((AsyncCatchFunction<T, IOException>)(ret, ex) -> { Review Comment: Should use CatchFunction. > [ARR] RouterStoragePolicy supports asynchronous rpc. > ---------------------------------------------------- > > Key: HDFS-17596 > URL: https://issues.apache.org/jira/browse/HDFS-17596 > Project: Hadoop HDFS > Issue Type: Sub-task > Reporter: Jian Zhang > Assignee: farmmamba > Priority: Major > Labels: pull-request-available > > *Describe* > The main new addition is RouterAsyncStoragePolicy, which extends > RouterStoragePolicy so that supports asynchronous rpc. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org