[ https://issues.apache.org/jira/browse/HDFS-17640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17909007#comment-17909007 ]
ASF GitHub Bot commented on HDFS-17640: --------------------------------------- Hexiaoqiao commented on code in PR #7188: URL: https://github.com/apache/hadoop/pull/7188#discussion_r1899904630 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java: ########## @@ -0,0 +1,1083 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; +import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; +import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; +import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture; + +/** + * Module that implements all the async RPC calls in {@link ClientProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterAsyncClientProtocol extends RouterClientProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName()); + + private final RouterRpcServer rpcServer; + private final RouterRpcClient rpcClient; + private final RouterFederationRename rbfRename; + private final FileSubclusterResolver subclusterResolver; + private final ActiveNamenodeResolver namenodeResolver; + /** If it requires response from all subclusters. */ + private final boolean allowPartialList; + /** Time out when getting the mount statistics. */ + private long mountStatusTimeOut; + /** Identifier for the super user. */ + private String superUser; + /** Identifier for the super group. */ + private final String superGroup; + /** + * Caching server defaults so as to prevent redundant calls to namenode, + * similar to DFSClient, caching saves efforts when router connects + * to multiple clients. + */ + private volatile FsServerDefaults serverDefaults; + + public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) { + super(conf, rpcServer); + this.rpcServer = rpcServer; + this.rpcClient = rpcServer.getRPCClient(); + this.rbfRename = getRbfRename(); + this.subclusterResolver = getSubclusterResolver(); + this.namenodeResolver = getNamenodeResolver(); + this.allowPartialList = isAllowPartialList(); + this.mountStatusTimeOut = getMountStatusTimeOut(); + this.superUser = getSuperUser(); + this.superGroup = getSuperGroup(); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + long now = Time.monotonicNow(); + if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate() + > getServerDefaultsValidityPeriod())) { + RemoteMethod method = new RemoteMethod("getServerDefaults"); + rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class); + asyncApply(o -> { + serverDefaults = (FsServerDefaults) o; + setServerDefaultsLastUpdate(now); + return serverDefaults; + }); + } else { + asyncComplete(serverDefaults); + } + return asyncReturn(FsServerDefaults.class); + } + + @Override + public HdfsFileStatus create(String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + if (createParent && rpcServer.isPathAll(src)) { + int index = src.lastIndexOf(Path.SEPARATOR); + String parent = src.substring(0, index); + LOG.debug("Creating {} requires creating parent {}", src, parent); + FsPermission parentPermissions = getParentPermission(masked); + mkdirs(parent, parentPermissions, createParent); + asyncApply((ApplyFunction<Boolean, Boolean>) success -> { + if (!success) { + // This shouldn't happen as mkdirs returns true or exception + LOG.error("Couldn't create parents for {}", src); + } + return success; + }); + } + + RemoteMethod method = new RemoteMethod("create", + new Class<?>[] {String.class, FsPermission.class, String.class, + EnumSetWritable.class, boolean.class, short.class, + long.class, CryptoProtocolVersion[].class, + String.class, String.class}, + new RemoteParam(), masked, clientName, flag, createParent, + replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); + final List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, true); + final RemoteLocation[] createLocation = new RemoteLocation[1]; + asyncTry(() -> { + rpcServer.getCreateLocationAsync(src, locations); + asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation -> { + createLocation[0] = remoteLocation; + rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class); + asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> { + status.setNamespace(remoteLocation.getNameserviceId()); + return status; + }); + }); + }); + asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> { + final List<RemoteLocation> newLocations = checkFaultTolerantRetry( + method, src, ioe, createLocation[0], locations); + rpcClient.invokeSequential( + newLocations, method, HdfsFileStatus.class, null); + }, IOException.class); + + return asyncReturn(HdfsFileStatus.class); + } + + @Override + public LastBlockWithStatus append( + String src, String clientName, + EnumSetWritable<CreateFlag> flag) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("append", + new Class<?>[] {String.class, String.class, EnumSetWritable.class}, + new RemoteParam(), clientName, flag); + rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, null); + asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> { + LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult(); + lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId()); + return lbws; + }); + return asyncReturn(LastBlockWithStatus.class); + } + + @Deprecated + @Override + public boolean rename(final String src, final String dst) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List<RemoteLocation> srcLocations = + rpcServer.getLocationsForPath(src, true, false); + final List<RemoteLocation> dstLocations = + rpcServer.getLocationsForPath(dst, false, false); + // srcLocations may be trimmed by getRenameDestinations() + final List<RemoteLocation> locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dstLocations); + if (locs.isEmpty()) { + asyncComplete( + rbfRename.routerFedRename(src, dst, srcLocations, dstLocations)); + return asyncReturn(Boolean.class); + } + RemoteMethod method = new RemoteMethod("rename", + new Class<?>[] {String.class, String.class}, + new RemoteParam(), dstParam); + isMultiDestDirectory(src); Review Comment: Duplicate name here? > [ARR] RouterClientProtocol supports asynchronous rpc. > ----------------------------------------------------- > > Key: HDFS-17640 > URL: https://issues.apache.org/jira/browse/HDFS-17640 > Project: Hadoop HDFS > Issue Type: Sub-task > Components: rbf > Reporter: farmmamba > Assignee: farmmamba > Priority: Major > Labels: pull-request-available > > RouterClientProtocol should support asynchronous rpc. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org