This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit e75bec659772e4d4f4584a2da16d62ea509ae411 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Tue Dec 31 15:20:47 2024 +0800 HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188) Co-authored-by: Jian Zhang <keeprom...@apache.org> --- .../federation/router/RouterClientProtocol.java | 133 ++- .../federation/router/RouterFederationRename.java | 2 +- .../server/federation/router/RouterRpcClient.java | 2 +- .../server/federation/router/RouterRpcServer.java | 20 +- .../router/async/RouterAsyncClientProtocol.java | 1083 ++++++++++++++++++++ .../router/async/RouterAsyncProtocolTestBase.java | 6 +- .../async/TestRouterAsyncClientProtocol.java | 144 +++ 7 files changed, 1352 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index d5064821905..cab4fad1909 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -85,6 +85,10 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy; import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol { /** Router security manager to handle token operations. */ private RouterSecurityManager securityManager = null; - RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { + public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { this.rpcServer = rpcServer; this.rpcClient = rpcServer.getRPCClient(); this.subclusterResolver = rpcServer.getSubclusterResolver(); @@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol { this.superGroup = conf.get( DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); - this.erasureCoding = new ErasureCoding(rpcServer); - this.storagePolicy = new RouterStoragePolicy(rpcServer); - this.snapshotProto = new RouterSnapshot(rpcServer); - this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); + if (rpcServer.isAsync()) { + this.erasureCoding = new AsyncErasureCoding(rpcServer); + this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer); + this.snapshotProto = new RouterAsyncSnapshot(rpcServer); + this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer); + } else { + this.erasureCoding = new ErasureCoding(rpcServer); + this.storagePolicy = new RouterStoragePolicy(rpcServer); + this.snapshotProto = new RouterSnapshot(rpcServer); + this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); + } this.securityManager = rpcServer.getRouterSecurityManager(); this.rbfRename = new RouterFederationRename(rpcServer, conf); this.defaultNameServiceEnabled = conf.getBoolean( @@ -347,7 +358,7 @@ protected static boolean isUnavailableSubclusterException( * @throws IOException If this path is not fault tolerant or the exception * should not be retried (e.g., NSQuotaExceededException). */ - private List<RemoteLocation> checkFaultTolerantRetry( + protected List<RemoteLocation> checkFaultTolerantRetry( final RemoteMethod method, final String src, final IOException ioe, final RemoteLocation excludeLoc, final List<RemoteLocation> locations) throws IOException { @@ -820,7 +831,7 @@ public void renewLease(String clientName, List<String> namespaces) /** * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results. */ - private static class GetListingComparator + protected static class GetListingComparator implements Comparator<byte[]>, Serializable { @Override public int compare(byte[] o1, byte[] o2) { @@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) { private static GetListingComparator comparator = new GetListingComparator(); + public static GetListingComparator getComparator() { + return comparator; + } + @Override public DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException { @@ -1104,7 +1119,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport( return mergeDtanodeStorageReport(dnSubcluster); } - private DatanodeStorageReport[] mergeDtanodeStorageReport( + protected DatanodeStorageReport[] mergeDtanodeStorageReport( Map<String, DatanodeStorageReport[]> dnSubcluster) { // Avoid repeating machines in multiple subclusters Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>(); @@ -1335,20 +1350,23 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio } /** - * Get all the locations of the path for {@link this#getContentSummary(String)}. + * Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}. * For example, there are some mount points: - * /a -> ns0 -> /a - * /a/b -> ns0 -> /a/b - * /a/b/c -> ns1 -> /a/b/c + * <p> + * /a - [ns0 - /a] + * /a/b - [ns0 - /a/b] + * /a/b/c - [ns1 - /a/b/c] + * </p> * When the path is '/a', the result of locations should be * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] * When the path is '/b', will throw NoLocationException. + * * @param path the path to get content summary * @return one list contains all the remote location - * @throws IOException + * @throws IOException if an I/O error occurs */ @VisibleForTesting - List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { + protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { // Try to get all the locations of the path. final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path); if (ns2Locations.isEmpty()) { @@ -2039,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() { * replacement value. * @throws IOException If the dst paths could not be determined. */ - private RemoteParam getRenameDestinations( + protected RemoteParam getRenameDestinations( final List<RemoteLocation> srcLocations, final List<RemoteLocation> dstLocations) throws IOException { @@ -2087,7 +2105,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location, * @param summaries Collection of individual summaries. * @return Aggregated content summary. */ - private ContentSummary aggregateContentSummary( + protected ContentSummary aggregateContentSummary( Collection<ContentSummary> summaries) { if (summaries.size() == 1) { return summaries.iterator().next(); @@ -2142,7 +2160,7 @@ private ContentSummary aggregateContentSummary( * everywhere. * @throws IOException If all the locations throw an exception. */ - private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, + protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, final RemoteMethod method) throws IOException { return getFileInfoAll(locations, method, -1); } @@ -2157,7 +2175,7 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, * everywhere. * @throws IOException If all the locations throw an exception. */ - private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, + protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, final RemoteMethod method, long timeOutMs) throws IOException { // Get the file info from everybody @@ -2186,12 +2204,11 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, /** * Get the permissions for the parent of a child with given permissions. - * Add implicit u+wx permission for parent. This is based on - * @{FSDirMkdirOp#addImplicitUwx}. + * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx. * @param mask The permission mask of the child. * @return The permission mask of the parent. */ - private static FsPermission getParentPermission(final FsPermission mask) { + protected static FsPermission getParentPermission(final FsPermission mask) { FsPermission ret = new FsPermission( mask.getUserAction().or(FsAction.WRITE_EXECUTE), mask.getGroupAction(), @@ -2208,7 +2225,7 @@ private static FsPermission getParentPermission(final FsPermission mask) { * @return New HDFS file status representing a mount point. */ @VisibleForTesting - HdfsFileStatus getMountPointStatus( + protected HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date) { return getMountPointStatus(name, childrenNum, date, true); } @@ -2223,7 +2240,7 @@ HdfsFileStatus getMountPointStatus( * @return New HDFS file status representing a mount point. */ @VisibleForTesting - HdfsFileStatus getMountPointStatus( + protected HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date, boolean setPath) { long modTime = date; long accessTime = date; @@ -2300,7 +2317,7 @@ HdfsFileStatus getMountPointStatus( * @param path Name of the path to start checking dates from. * @return Map with the modification dates for all sub-entries. */ - private Map<String, Long> getMountPointDates(String path) { + protected Map<String, Long> getMountPointDates(String path) { Map<String, Long> ret = new TreeMap<>(); if (subclusterResolver instanceof MountTableResolver) { try { @@ -2361,9 +2378,15 @@ private long getModifiedTime(Map<String, Long> ret, String path, } /** - * Get listing on remote locations. + * Get a partial listing of the indicated directory. + * + * @param src the directory name + * @param startAfter the name to start after + * @param needLocation if blockLocations need to be returned + * @return a partial listing starting after startAfter + * @throws IOException if other I/O error occurred */ - private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( + protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( String src, byte[] startAfter, boolean needLocation) throws IOException { try { List<RemoteLocation> locations = @@ -2400,9 +2423,9 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( * @param startAfter starting listing from client, used to define listing * start boundary * @param remainingEntries how many entries left from subcluster - * @return + * @return true if should add mount point, otherwise false; */ - private static boolean shouldAddMountPoint( + protected static boolean shouldAddMountPoint( byte[] mountPoint, byte[] lastEntry, byte[] startAfter, int remainingEntries) { if (comparator.compare(mountPoint, startAfter) > 0 && @@ -2425,7 +2448,7 @@ private static boolean shouldAddMountPoint( * @throws IOException if unable to get the file status. */ @VisibleForTesting - boolean isMultiDestDirectory(String src) throws IOException { + protected boolean isMultiDestDirectory(String src) throws IOException { try { if (rpcServer.isPathAll(src)) { List<RemoteLocation> locations; @@ -2449,4 +2472,56 @@ boolean isMultiDestDirectory(String src) throws IOException { public int getRouterFederationRenameCount() { return rbfRename.getRouterFederationRenameCount(); } + + public RouterRpcServer getRpcServer() { + return rpcServer; + } + + public RouterRpcClient getRpcClient() { + return rpcClient; + } + + public FileSubclusterResolver getSubclusterResolver() { + return subclusterResolver; + } + + public ActiveNamenodeResolver getNamenodeResolver() { + return namenodeResolver; + } + + public long getServerDefaultsLastUpdate() { + return serverDefaultsLastUpdate; + } + + public long getServerDefaultsValidityPeriod() { + return serverDefaultsValidityPeriod; + } + + public boolean isAllowPartialList() { + return allowPartialList; + } + + public long getMountStatusTimeOut() { + return mountStatusTimeOut; + } + + public String getSuperUser() { + return superUser; + } + + public String getSuperGroup() { + return superGroup; + } + + public RouterStoragePolicy getStoragePolicy() { + return storagePolicy; + } + + public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) { + this.serverDefaultsLastUpdate = serverDefaultsLastUpdate; + } + + public RouterFederationRename getRbfRename() { + return rbfRename; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java index aafb685b886..772e7257888 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java @@ -93,7 +93,7 @@ public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) { * @throws IOException if rename fails. * @return true if rename succeeds. */ - boolean routerFedRename(final String src, final String dst, + public boolean routerFedRename(final String src, final String dst, final List<RemoteLocation> srcLocations, final List<RemoteLocation> dstLocations) throws IOException { if (!rpcServer.isEnableRenameAcrossNamespace()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index e07de092dd7..c7c3699f33e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1969,7 +1969,7 @@ protected boolean isObserverReadEligible(String nsId, Method method) { * @param nsId namespaceID * @return whether the 'namespace' has observer reads enabled. */ - boolean isNamespaceObserverReadEligible(String nsId) { + public boolean isNamespaceObserverReadEligible(String nsId) { return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 39a50d4e3a6..0c1d3dfbdec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -75,7 +75,11 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol; import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol; import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction; @@ -288,6 +292,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. */ + @SuppressWarnings("checkstyle:MethodLength") public RouterRpcServer(Configuration conf, Router router, ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) throws IOException { @@ -424,14 +429,19 @@ public RouterRpcServer(Configuration conf, Router router, if (this.enableAsync) { this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router, this.namenodeResolver, this.rpcMonitor, routerStateIdContext); + this.clientProto = new RouterAsyncClientProtocol(conf, this); + this.nnProto = new RouterAsyncNamenodeProtocol(this); + this.routerProto = new RouterAsyncUserProtocol(this); + this.quotaCall = new AsyncQuota(this.router, this); } else { this.rpcClient = new RouterRpcClient(this.conf, this.router, this.namenodeResolver, this.rpcMonitor, routerStateIdContext); + this.clientProto = new RouterClientProtocol(conf, this); + this.nnProto = new RouterNamenodeProtocol(this); + this.routerProto = new RouterUserProtocol(this); + this.quotaCall = new Quota(this.router, this); } - this.nnProto = new RouterNamenodeProtocol(this); - this.quotaCall = new Quota(this.router, this); - this.clientProto = new RouterClientProtocol(conf, this); - this.routerProto = new RouterUserProtocol(this); + long dnCacheExpire = conf.getTimeDuration( DN_REPORT_CACHE_EXPIRE, DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); @@ -2193,7 +2203,7 @@ public FederationRPCMetrics getRPCMetrics() { * @param path Path to check. * @return If a path should be in all subclusters. */ - boolean isPathAll(final String path) { + public boolean isPathAll(final String path) { MountTable entry = getMountTable(path); return entry != null && entry.isAll(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java new file mode 100644 index 00000000000..ae44f7aaf1d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java @@ -0,0 +1,1083 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; +import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; +import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; +import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture; + +/** + * Module that implements all the async RPC calls in {@link ClientProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterAsyncClientProtocol extends RouterClientProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName()); + + private final RouterRpcServer rpcServer; + private final RouterRpcClient rpcClient; + private final RouterFederationRename rbfRename; + private final FileSubclusterResolver subclusterResolver; + private final ActiveNamenodeResolver namenodeResolver; + /** If it requires response from all subclusters. */ + private final boolean allowPartialList; + /** Time out when getting the mount statistics. */ + private long mountStatusTimeOut; + /** Identifier for the super user. */ + private String superUser; + /** Identifier for the super group. */ + private final String superGroup; + /** + * Caching server defaults so as to prevent redundant calls to namenode, + * similar to DFSClient, caching saves efforts when router connects + * to multiple clients. + */ + private volatile FsServerDefaults serverDefaults; + + public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) { + super(conf, rpcServer); + this.rpcServer = rpcServer; + this.rpcClient = rpcServer.getRPCClient(); + this.rbfRename = getRbfRename(); + this.subclusterResolver = getSubclusterResolver(); + this.namenodeResolver = getNamenodeResolver(); + this.allowPartialList = isAllowPartialList(); + this.mountStatusTimeOut = getMountStatusTimeOut(); + this.superUser = getSuperUser(); + this.superGroup = getSuperGroup(); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + long now = Time.monotonicNow(); + if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate() + > getServerDefaultsValidityPeriod())) { + RemoteMethod method = new RemoteMethod("getServerDefaults"); + rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class); + asyncApply(o -> { + serverDefaults = (FsServerDefaults) o; + setServerDefaultsLastUpdate(now); + return serverDefaults; + }); + } else { + asyncComplete(serverDefaults); + } + return asyncReturn(FsServerDefaults.class); + } + + @Override + public HdfsFileStatus create(String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + if (createParent && rpcServer.isPathAll(src)) { + int index = src.lastIndexOf(Path.SEPARATOR); + String parent = src.substring(0, index); + LOG.debug("Creating {} requires creating parent {}", src, parent); + FsPermission parentPermissions = getParentPermission(masked); + mkdirs(parent, parentPermissions, createParent); + asyncApply((ApplyFunction<Boolean, Boolean>) success -> { + if (!success) { + // This shouldn't happen as mkdirs returns true or exception + LOG.error("Couldn't create parents for {}", src); + } + return success; + }); + } + + RemoteMethod method = new RemoteMethod("create", + new Class<?>[] {String.class, FsPermission.class, String.class, + EnumSetWritable.class, boolean.class, short.class, + long.class, CryptoProtocolVersion[].class, + String.class, String.class}, + new RemoteParam(), masked, clientName, flag, createParent, + replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); + final List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, true); + final RemoteLocation[] createLocation = new RemoteLocation[1]; + asyncTry(() -> { + rpcServer.getCreateLocationAsync(src, locations); + asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation -> { + createLocation[0] = remoteLocation; + rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class); + asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> { + status.setNamespace(remoteLocation.getNameserviceId()); + return status; + }); + }); + }); + asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> { + final List<RemoteLocation> newLocations = checkFaultTolerantRetry( + method, src, ioe, createLocation[0], locations); + rpcClient.invokeSequential( + newLocations, method, HdfsFileStatus.class, null); + }, IOException.class); + + return asyncReturn(HdfsFileStatus.class); + } + + @Override + public LastBlockWithStatus append( + String src, String clientName, + EnumSetWritable<CreateFlag> flag) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("append", + new Class<?>[] {String.class, String.class, EnumSetWritable.class}, + new RemoteParam(), clientName, flag); + rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, null); + asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> { + LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult(); + lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId()); + return lbws; + }); + return asyncReturn(LastBlockWithStatus.class); + } + + @Deprecated + @Override + public boolean rename(final String src, final String dst) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List<RemoteLocation> srcLocations = + rpcServer.getLocationsForPath(src, true, false); + final List<RemoteLocation> dstLocations = + rpcServer.getLocationsForPath(dst, false, false); + // srcLocations may be trimmed by getRenameDestinations() + final List<RemoteLocation> locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dstLocations); + if (locs.isEmpty()) { + asyncComplete( + rbfRename.routerFedRename(src, dst, srcLocations, dstLocations)); + return asyncReturn(Boolean.class); + } + RemoteMethod method = new RemoteMethod("rename", + new Class<?>[] {String.class, String.class}, + new RemoteParam(), dstParam); + isMultiDestDirectory(src); + asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> { + if (isMultiDestDirectory) { + if (locs.size() != srcLocations.size()) { + throw new IOException("Rename of " + src + " to " + dst + " is not" + + " allowed. The number of remote locations for both source and" + + " target should be same."); + } + rpcClient.invokeAll(locs, method); + } else { + rpcClient.invokeSequential(locs, method, Boolean.class, + Boolean.TRUE); + } + }); + return asyncReturn(Boolean.class); + } + + @Override + public void rename2( + final String src, final String dst, + final Options.Rename... options) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List<RemoteLocation> srcLocations = + rpcServer.getLocationsForPath(src, true, false); + final List<RemoteLocation> dstLocations = + rpcServer.getLocationsForPath(dst, false, false); + // srcLocations may be trimmed by getRenameDestinations() + final List<RemoteLocation> locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dstLocations); + if (locs.isEmpty()) { + rbfRename.routerFedRename(src, dst, srcLocations, dstLocations); + return; + } + RemoteMethod method = new RemoteMethod("rename2", + new Class<?>[] {String.class, String.class, options.getClass()}, + new RemoteParam(), dstParam, options); + isMultiDestDirectory(src); + asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> { + if (isMultiDestDirectory) { + if (locs.size() != srcLocations.size()) { + throw new IOException("Rename of " + src + " to " + dst + " is not" + + " allowed. The number of remote locations for both source and" + + " target should be same."); + } + rpcClient.invokeConcurrent(locs, method); + } else { + rpcClient.invokeSequential(locs, method, null, null); + } + }); + } + + @Override + public void concat(String trg, String[] src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // Concat only effects when all files in the same namespace. + getFileRemoteLocation(trg); + asyncApply((AsyncApplyFunction<RemoteLocation, Object>) targetDestination -> { + if (targetDestination == null) { + throw new IOException("Cannot find target file - " + trg); + } + String targetNameService = targetDestination.getNameserviceId(); + String[] sourceDestinations = new String[src.length]; + int[] index = new int[1]; + asyncForEach(Arrays.stream(src).iterator(), (forEachRun, sourceFile) -> { + getFileRemoteLocation(sourceFile); + asyncApply((ApplyFunction<RemoteLocation, Object>) srcLocation -> { + if (srcLocation == null) { + throw new IOException("Cannot find source file - " + sourceFile); + } + sourceDestinations[index[0]++] = srcLocation.getDest(); + if (!targetNameService.equals(srcLocation.getNameserviceId())) { + throw new IOException("Cannot concatenate source file " + sourceFile + + " because it is located in a different namespace" + " with nameservice " + + srcLocation.getNameserviceId() + " from the target file with nameservice " + + targetNameService); + } + return null; + }); + }); + asyncApply((AsyncApplyFunction<Object, Object>) o -> { + // Invoke + RemoteMethod method = new RemoteMethod("concat", + new Class<?>[] {String.class, String[].class}, + targetDestination.getDest(), sourceDestinations); + rpcClient.invokeSingle(targetDestination, method, Void.class); + }); + }); + } + + @Override + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("mkdirs", + new Class<?>[] {String.class, FsPermission.class, boolean.class}, + new RemoteParam(), masked, createParent); + + // Create in all locations + if (rpcServer.isPathAll(src)) { + return rpcClient.invokeAll(locations, method); + } + + asyncComplete(false); + if (locations.size() > 1) { + // Check if this directory already exists + asyncTry(() -> { + getFileInfo(src); + asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> { + if (fileStatus != null) { + // When existing, the NN doesn't return an exception; return true + return true; + } + return false; + }); + }); + asyncCatch((ret, ex) -> { + // Can't query if this file exists or not. + LOG.error("Error getting file info for {} while proxying mkdirs: {}", + src, ex.getMessage()); + return false; + }, IOException.class); + } + + final RemoteLocation firstLocation = locations.get(0); + asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> { + if (success) { + asyncComplete(true); + return; + } + asyncTry(() -> { + rpcClient.invokeSingle(firstLocation, method, Boolean.class); + }); + + asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> { + final List<RemoteLocation> newLocations = checkFaultTolerantRetry( + method, src, ioe, firstLocation, locations); + rpcClient.invokeSequential( + newLocations, method, Boolean.class, Boolean.TRUE); + }, IOException.class); + }); + + return asyncReturn(Boolean.class); + } + + @Override + public DirectoryListing getListing( + String src, byte[] startAfter, boolean needLocation) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + GetListingComparator comparator = RouterClientProtocol.getComparator(); + getListingInt(src, startAfter, needLocation); + asyncApply((AsyncApplyFunction<List<RemoteResult<RemoteLocation, DirectoryListing>>, Object>) + listings -> { + TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator); + int totalRemainingEntries = 0; + final int[] remainingEntries = {0}; + boolean namenodeListingExists = false; + // Check the subcluster listing with the smallest name to make sure + // no file is skipped across subclusters + byte[] lastName = null; + if (listings != null) { + for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) { + if (result.hasException()) { + IOException ioe = result.getException(); + if (ioe instanceof FileNotFoundException) { + RemoteLocation location = result.getLocation(); + LOG.debug("Cannot get listing from {}", location); + } else if (!allowPartialList) { + throw ioe; + } + } else if (result.getResult() != null) { + DirectoryListing listing = result.getResult(); + totalRemainingEntries += listing.getRemainingEntries(); + HdfsFileStatus[] partialListing = listing.getPartialListing(); + int length = partialListing.length; + if (length > 0) { + HdfsFileStatus lastLocalEntry = partialListing[length-1]; + byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes(); + if (lastName == null || + comparator.compare(lastName, lastLocalName) > 0) { + lastName = lastLocalName; + } + } + } + } + + // Add existing entries + for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) { + DirectoryListing listing = result.getResult(); + if (listing != null) { + namenodeListingExists = true; + for (HdfsFileStatus file : listing.getPartialListing()) { + byte[] filename = file.getLocalNameInBytes(); + if (totalRemainingEntries > 0 && + comparator.compare(filename, lastName) > 0) { + // Discarding entries further than the lastName + remainingEntries[0]++; + } else { + nnListing.put(filename, file); + } + } + remainingEntries[0] += listing.getRemainingEntries(); + } + } + } + + // Add mount points at this level in the tree + final List<String> children = subclusterResolver.getMountPoints(src); + if (children != null) { + // Get the dates for each mount point + Map<String, Long> dates = getMountPointDates(src); + byte[] finalLastName = lastName; + asyncForEach(children.iterator(), (forEachRun, child) -> { + long date = 0; + if (dates != null && dates.containsKey(child)) { + date = dates.get(child); + } + Path childPath = new Path(src, child); + getMountPointStatus(childPath.toString(), 0, date); + asyncApply((ApplyFunction<HdfsFileStatus, Object>) dirStatus -> { + // if there is no subcluster path, always add mount point + byte[] bChild = DFSUtil.string2Bytes(child); + if (finalLastName == null) { + nnListing.put(bChild, dirStatus); + } else { + if (shouldAddMountPoint(bChild, + finalLastName, startAfter, remainingEntries[0])) { + // This may overwrite existing listing entries with the mount point + // TODO don't add if already there? + nnListing.put(bChild, dirStatus); + } + } + return null; + }); + }); + asyncApply(o -> { + // Update the remaining count to include left mount points + if (nnListing.size() > 0) { + byte[] lastListing = nnListing.lastKey(); + for (int i = 0; i < children.size(); i++) { + byte[] bChild = DFSUtil.string2Bytes(children.get(i)); + if (comparator.compare(bChild, lastListing) > 0) { + remainingEntries[0] += (children.size() - i); + break; + } + } + } + return null; + }); + } + asyncComplete(namenodeListingExists); + asyncApply((ApplyFunction<Boolean, Object>) exists -> { + if (!exists && nnListing.size() == 0 && children == null) { + // NN returns a null object if the directory cannot be found and has no + // listing. If we didn't retrieve any NN listing data, and there are no + // mount points here, return null. + return null; + } + + // Generate combined listing + HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()]; + combinedData = nnListing.values().toArray(combinedData); + return new DirectoryListing(combinedData, remainingEntries[0]); + }); + }); + return asyncReturn(DirectoryListing.class); + } + + /** + * Get listing on remote locations. + */ + @Override + protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( + String src, byte[] startAfter, boolean needLocation) throws IOException { + List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, false, false); + // Locate the dir and fetch the listing. + if (locations.isEmpty()) { + asyncComplete(new ArrayList<>()); + return asyncReturn(List.class); + } + asyncTry(() -> { + RemoteMethod method = new RemoteMethod("getListing", + new Class<?>[] {String.class, startAfter.getClass(), boolean.class}, + new RemoteParam(), startAfter, needLocation); + rpcClient.invokeConcurrent(locations, method, false, -1, + DirectoryListing.class); + }); + asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> { + LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage()); + LOG.info("Cannot get locations for {}, {}.", src, e.getMessage()); + return new ArrayList<>(); + }, RouterResolveException.class); + return asyncReturn(List.class); + } + + + @Override + public HdfsFileStatus getFileInfo(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final IOException[] noLocationException = new IOException[1]; + asyncTry(() -> { + final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false, false); + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class<?>[] {String.class}, new RemoteParam()); + // If it's a directory, we check in all locations + if (rpcServer.isPathAll(src)) { + getFileInfoAll(locations, method); + } else { + // Check for file information sequentially + rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null); + } + }); + asyncCatch((o, e) -> { + if (e instanceof NoLocationException + || e instanceof RouterResolveException) { + noLocationException[0] = e; + } + throw e; + }, IOException.class); + + asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> { + // If there is no real path, check mount points + if (ret == null) { + List<String> children = subclusterResolver.getMountPoints(src); + if (children != null && !children.isEmpty()) { + Map<String, Long> dates = getMountPointDates(src); + long date = 0; + if (dates != null && dates.containsKey(src)) { + date = dates.get(src); + } + getMountPointStatus(src, children.size(), date, false); + } else if (children != null) { + // The src is a mount point, but there are no files or directories + getMountPointStatus(src, 0, 0, false); + } else { + asyncComplete(null); + } + asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) result -> { + // Can't find mount point for path and the path didn't contain any sub monit points, + // throw the NoLocationException to client. + if (result == null && noLocationException[0] != null) { + throw noLocationException[0]; + } + + return result; + }); + } else { + asyncComplete(ret); + } + }); + + return asyncReturn(HdfsFileStatus.class); + } + + @Override + public RemoteLocation getFileRemoteLocation(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false); + if (locations.size() == 1) { + asyncComplete(locations.get(0)); + return asyncReturn(RemoteLocation.class); + } + + asyncForEach(locations.iterator(), (forEachRun, location) -> { + RemoteMethod method = + new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(Collections.singletonList(location), method, + HdfsFileStatus.class, null); + asyncApply((ApplyFunction<HdfsFileStatus, RemoteLocation>) ret -> { + if (ret != null) { + forEachRun.breakNow(); + return location; + } + return null; + }); + }); + + return asyncReturn(RemoteLocation.class); + } + + @Override + public HdfsFileStatus getMountPointStatus( + String name, int childrenNum, long date, boolean setPath) { + long modTime = date; + long accessTime = date; + final FsPermission[] permission = new FsPermission[]{FsPermission.getDirDefault()}; + final String[] owner = new String[]{this.superUser}; + final String[] group = new String[]{this.superGroup}; + final int[] childrenNums = new int[]{childrenNum}; + final EnumSet<HdfsFileStatus.Flags>[] flags = + new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)}; + asyncComplete(null); + if (getSubclusterResolver() instanceof MountTableResolver) { + asyncTry(() -> { + String mName = name.startsWith("/") ? name : "/" + name; + MountTableResolver mountTable = (MountTableResolver) subclusterResolver; + MountTable entry = mountTable.getMountPoint(mName); + if (entry != null) { + permission[0] = entry.getMode(); + owner[0] = entry.getOwnerName(); + group[0] = entry.getGroupName(); + + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class<?>[] {String.class}, new RemoteParam()); + getFileInfoAll( + entry.getDestinations(), method, mountStatusTimeOut); + asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) fInfo -> { + if (fInfo != null) { + permission[0] = fInfo.getPermission(); + owner[0] = fInfo.getOwner(); + group[0] = fInfo.getGroup(); + childrenNums[0] = fInfo.getChildrenNum(); + flags[0] = DFSUtil + .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(), + fInfo.isSnapshotEnabled(), fInfo.hasAcl()); + } + return fInfo; + }); + } + }); + asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> { + LOG.error("Cannot get mount point: {}", e.getMessage()); + return status; + }, IOException.class); + } else { + try { + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + owner[0] = ugi.getUserName(); + group[0] = ugi.getPrimaryGroupName(); + } catch (IOException e) { + String msg = "Cannot get remote user: " + e.getMessage(); + if (UserGroupInformation.isSecurityEnabled()) { + LOG.error(msg); + } else { + LOG.debug(msg); + } + } + } + long inodeId = 0; + HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder(); + asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> { + if (setPath) { + Path path = new Path(name); + String nameStr = path.getName(); + builder.path(DFSUtil.string2Bytes(nameStr)); + } + + return builder.isdir(true) + .mtime(modTime) + .atime(accessTime) + .perm(permission[0]) + .owner(owner[0]) + .group(group[0]) + .symlink(new byte[0]) + .fileId(inodeId) + .children(childrenNums[0]) + .flags(flags[0]) + .build(); + }); + return asyncReturn(HdfsFileStatus.class); + } + + @Override + protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, + final RemoteMethod method, long timeOutMs) throws IOException { + + asyncComplete(null); + // Get the file info from everybody + rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs, + HdfsFileStatus.class); + asyncApply(res -> { + Map<RemoteLocation, HdfsFileStatus> results = (Map<RemoteLocation, HdfsFileStatus>) res; + int children = 0; + // We return the first file + HdfsFileStatus dirStatus = null; + for (RemoteLocation loc : locations) { + HdfsFileStatus fileStatus = results.get(loc); + if (fileStatus != null) { + children += fileStatus.getChildrenNum(); + if (!fileStatus.isDirectory()) { + return fileStatus; + } else if (dirStatus == null) { + dirStatus = fileStatus; + } + } + } + if (dirStatus != null) { + return updateMountPointStatus(dirStatus, children); + } + return null; + }); + return asyncReturn(HdfsFileStatus.class); + } + + @Override + public boolean recoverLease(String src, String clientName) throws IOException { + super.recoverLease(src, clientName); + return asyncReturn(boolean.class); + } + + @Override + public long[] getStats() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("getStats"); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false, long[].class); + asyncApply(o -> { + Map<FederationNamespaceInfo, long[]> results + = (Map<FederationNamespaceInfo, long[]>) o; + long[] combinedData = new long[STATS_ARRAY_LENGTH]; + for (long[] data : results.values()) { + for (int i = 0; i < combinedData.length && i < data.length; i++) { + if (data[i] >= 0) { + combinedData[i] += data[i]; + } + } + } + return combinedData; + }); + return asyncReturn(long[].class); + } + + @Override + public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getReplicatedBlockStats"); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, + false, ReplicatedBlockStats.class); + asyncApply(o -> { + Map<FederationNamespaceInfo, ReplicatedBlockStats> ret = + (Map<FederationNamespaceInfo, ReplicatedBlockStats>) o; + return ReplicatedBlockStats.merge(ret.values()); + }); + return asyncReturn(ReplicatedBlockStats.class); + } + + @Override + public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + return rpcServer.getDatanodeReportAsync(type, true, 0); + } + + @Override + public DatanodeStorageReport[] getDatanodeStorageReport( + HdfsConstants.DatanodeReportType type) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + rpcServer.getDatanodeStorageReportMapAsync(type); + asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>) + dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster)); + return asyncReturn(DatanodeStorageReport[].class); + } + + public DatanodeStorageReport[] getDatanodeStorageReport( + HdfsConstants.DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + rpcServer.getDatanodeStorageReportMapAsync(type, requireResponse, timeOutMs); + asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>) + dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster)); + return asyncReturn(DatanodeStorageReport[].class); + } + + @Override + public boolean setSafeMode(HdfsConstants.SafeModeAction action, + boolean isChecked) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // Set safe mode in all the name spaces + RemoteMethod method = new RemoteMethod("setSafeMode", + new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class}, + action, isChecked); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent( + nss, method, true, !isChecked, Boolean.class); + + asyncApply(o -> { + Map<FederationNamespaceInfo, Boolean> results + = (Map<FederationNamespaceInfo, Boolean>) o; + // We only report true if all the name space are in safe mode + int numSafemode = 0; + for (boolean safemode : results.values()) { + if (safemode) { + numSafemode++; + } + } + return numSafemode == results.size(); + }); + return asyncReturn(Boolean.class); + } + + @Override + public boolean saveNamespace(long timeWindow, long txGap) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("saveNamespace", + new Class<?>[] {long.class, long.class}, timeWindow, txGap); + final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, + false, boolean.class); + + asyncApply(o -> { + Map<FederationNamespaceInfo, Boolean> ret = + (Map<FederationNamespaceInfo, Boolean>) o; + boolean success = true; + for (boolean s : ret.values()) { + if (!s) { + success = false; + break; + } + } + return success; + }); + return asyncReturn(Boolean.class); + } + + @Override + public long rollEdits() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {}); + final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false, long.class); + asyncApply(o -> { + Map<FederationNamespaceInfo, Long> ret = + (Map<FederationNamespaceInfo, Long>) o; + // Return the maximum txid + long txid = 0; + for (long t : ret.values()) { + if (t > txid) { + txid = t; + } + } + return txid; + }); + return asyncReturn(long.class); + } + + @Override + public boolean restoreFailedStorage(String arg) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("restoreFailedStorage", + new Class<?>[] {String.class}, arg); + final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class); + asyncApply(o -> { + Map<FederationNamespaceInfo, Boolean> ret = + (Map<FederationNamespaceInfo, Boolean>) o; + boolean success = true; + for (boolean s : ret.values()) { + if (!s) { + success = false; + break; + } + } + return success; + }); + return asyncReturn(boolean.class); + } + + @Override + public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction action) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("rollingUpgrade", + new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action); + final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + + rpcClient.invokeConcurrent( + nss, method, true, false, RollingUpgradeInfo.class); + asyncApply(o -> { + Map<FederationNamespaceInfo, RollingUpgradeInfo> ret = + (Map<FederationNamespaceInfo, RollingUpgradeInfo>) o; + // Return the first rolling upgrade info + RollingUpgradeInfo info = null; + for (RollingUpgradeInfo infoNs : ret.values()) { + if (info == null && infoNs != null) { + info = infoNs; + } + } + return info; + }); + return asyncReturn(RollingUpgradeInfo.class); + } + + @Override + public ContentSummary getContentSummary(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // Get the summaries from regular files + final Collection<ContentSummary> summaries = new ArrayList<>(); + final List<RemoteLocation> locations = getLocationsForContentSummary(path); + final RemoteMethod method = new RemoteMethod("getContentSummary", + new Class<?>[] {String.class}, new RemoteParam()); + rpcClient.invokeConcurrent(locations, method, + false, -1, ContentSummary.class); + + asyncApply(o -> { + final List<RemoteResult<RemoteLocation, ContentSummary>> results = + (List<RemoteResult<RemoteLocation, ContentSummary>>) o; + + FileNotFoundException notFoundException = null; + for (RemoteResult<RemoteLocation, ContentSummary> result : results) { + if (result.hasException()) { + IOException ioe = result.getException(); + if (ioe instanceof FileNotFoundException) { + notFoundException = (FileNotFoundException)ioe; + } else if (!allowPartialList) { + throw ioe; + } + } else if (result.getResult() != null) { + summaries.add(result.getResult()); + } + } + + // Throw original exception if no original nor mount points + if (summaries.isEmpty() && notFoundException != null) { + throw notFoundException; + } + return aggregateContentSummary(summaries); + }); + + return asyncReturn(ContentSummary.class); + } + + @Override + public long getCurrentEditLogTxid() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod( + "getCurrentEditLogTxid", new Class<?>[] {}); + final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false, long.class); + + asyncApply(o -> { + Map<FederationNamespaceInfo, Long> ret = + (Map<FederationNamespaceInfo, Long>) o; + // Return the maximum txid + long txid = 0; + for (long t : ret.values()) { + if (t > txid) { + txid = t; + } + } + return txid; + }); + return asyncReturn(long.class); + } + + @Override + public void msync() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, true); + // Only msync to nameservices with observer reads enabled. + Set<FederationNamespaceInfo> allNamespaces = namenodeResolver.getNamespaces(); + RemoteMethod method = new RemoteMethod("msync"); + Set<FederationNamespaceInfo> namespacesEligibleForObserverReads = allNamespaces + .stream() + .filter(ns -> rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId())) + .collect(Collectors.toSet()); + if (namespacesEligibleForObserverReads.isEmpty()) { + asyncCompleteWith(CompletableFuture.completedFuture(null)); + return; + } + rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method); + } + + @Override + public boolean setReplication(String src, short replication) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setReplication", + new Class<?>[] {String.class, short.class}, new RemoteParam(), + replication); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, method, Boolean.class); + asyncApply(o -> { + Map<RemoteLocation, Boolean> results = (Map<RemoteLocation, Boolean>) o; + return !results.containsValue(false); + }); + } else { + rpcClient.invokeSequential(locations, method, Boolean.class, + Boolean.TRUE); + } + return asyncReturn(boolean.class); + } + + /** + * Checks if the path is a directory and is supposed to be present in all + * subclusters. + * @param src the source path + * @return true if the path is directory and is supposed to be present in all + * subclusters else false in all other scenarios. + * @throws IOException if unable to get the file status. + */ + @Override + public boolean isMultiDestDirectory(String src) throws IOException { + try { + if (rpcServer.isPathAll(src)) { + List<RemoteLocation> locations; + locations = rpcServer.getLocationsForPath(src, false, false); + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class<?>[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, + method, HdfsFileStatus.class, null); + CompletableFuture<Object> completableFuture = getCompletableFuture(); + completableFuture = completableFuture.thenApply(o -> { + HdfsFileStatus fileStatus = (HdfsFileStatus) o; + if (fileStatus != null) { + return fileStatus.isDirectory(); + } else { + LOG.debug("The destination {} doesn't exist.", src); + } + return false; + }); + asyncCompleteWith(completableFuture); + return asyncReturn(Boolean.class); + } + } catch (UnresolvedPathException e) { + LOG.debug("The destination {} is a symlink.", src); + } + asyncCompleteWith(CompletableFuture.completedFuture(false)); + return asyncReturn(Boolean.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java index cc25516d59f..51a3a1b9c28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java @@ -56,6 +56,7 @@ public class RouterAsyncProtocolTestBase { private FileSystem routerFs; private RouterRpcServer routerRpcServer; private RouterRpcServer routerAsyncRpcServer; + protected static final String TEST_DIR_PATH = "/testdir"; @BeforeClass public static void setUpCluster() throws Exception { @@ -114,19 +115,20 @@ public void setUp() throws IOException { routerRpcServer.getRouterStateIdContext()); routerAsyncRpcServer = Mockito.spy(routerRpcServer); Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient); + Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true); // Create mock locations MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); resolver.addLocation("/", ns0, "/"); FsPermission permission = new FsPermission("705"); - routerFs.mkdirs(new Path("/testdir"), permission); + routerFs.mkdirs(new Path(TEST_DIR_PATH), permission); } @After public void tearDown() throws IOException { // clear client context CallerContext.setCurrent(null); - boolean delete = routerFs.delete(new Path("/testdir")); + boolean delete = routerFs.delete(new Path(TEST_DIR_PATH)); assertTrue(delete); if (routerFs != null) { routerFs.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java new file mode 100644 index 00000000000..96f957f93df --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.util.Lists; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES; +import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import static org.apache.hadoop.fs.permission.AclEntryType.USER; +import static org.apache.hadoop.fs.permission.FsAction.ALL; +import static org.apache.hadoop.fs.permission.FsAction.NONE; +import static org.apache.hadoop.fs.permission.FsAction.READ; +import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Used to test the functionality of {@link RouterAsyncClientProtocol}. + */ +public class TestRouterAsyncClientProtocol extends RouterAsyncProtocolTestBase { + private RouterAsyncClientProtocol asyncClientProtocol; + private RouterClientProtocol clientProtocol; + private final String testPath = TEST_DIR_PATH + "/test"; + + @Before + public void setup() throws IOException { + asyncClientProtocol = new RouterAsyncClientProtocol(getRouterConf(), getRouterAsyncRpcServer()); + clientProtocol = new RouterClientProtocol(getRouterConf(), getRouterRpcServer()); + } + + @Test + public void testGetServerDefaults() throws Exception { + FsServerDefaults serverDefaults = clientProtocol.getServerDefaults(); + asyncClientProtocol.getServerDefaults(); + FsServerDefaults fsServerDefaults = syncReturn(FsServerDefaults.class); + assertEquals(serverDefaults.getBlockSize(), fsServerDefaults.getBlockSize()); + assertEquals(serverDefaults.getReplication(), fsServerDefaults.getReplication()); + assertEquals(serverDefaults.getChecksumType(), fsServerDefaults.getChecksumType()); + assertEquals( + serverDefaults.getDefaultStoragePolicyId(), fsServerDefaults.getDefaultStoragePolicyId()); + } + + @Test + public void testClientProtocolRpc() throws Exception { + asyncClientProtocol.mkdirs(testPath, new FsPermission(ALL, ALL, ALL), false); + Boolean success = syncReturn(Boolean.class); + assertTrue(success); + + asyncClientProtocol.setPermission(testPath, new FsPermission(READ_WRITE, READ, NONE)); + syncReturn(Void.class); + + asyncClientProtocol.getFileInfo(testPath); + HdfsFileStatus hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertEquals(hdfsFileStatus.getPermission(), new FsPermission(READ_WRITE, READ, NONE)); + + List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "tmpUser", ALL)); + asyncClientProtocol.setAcl(testPath, aclSpec); + syncReturn(Void.class); + asyncClientProtocol.setOwner(testPath, "tmpUser", "tmpUserGroup"); + syncReturn(Void.class); + + asyncClientProtocol.getFileInfo(testPath); + hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertEquals("tmpUser", hdfsFileStatus.getOwner()); + assertEquals("tmpUserGroup", hdfsFileStatus.getGroup()); + + asyncClientProtocol.create(testPath + "/testCreate.file", + new FsPermission(ALL, ALL, ALL), "testAsyncClient", + new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), + false, (short) 1, 128 * 1024 * 1024L, + new CryptoProtocolVersion[]{ENCRYPTION_ZONES}, + null, null); + hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertTrue(hdfsFileStatus.isFile()); + assertEquals(128 * 1024 * 1024, hdfsFileStatus.getBlockSize()); + + asyncClientProtocol.getFileRemoteLocation(testPath); + RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); + assertNotNull(remoteLocation); + assertEquals(getNs0(), remoteLocation.getNameserviceId()); + assertEquals(testPath, remoteLocation.getSrc()); + + asyncClientProtocol.getListing(testPath, new byte[1], true); + DirectoryListing directoryListing = syncReturn(DirectoryListing.class); + assertEquals(1, directoryListing.getPartialListing().length); + + asyncClientProtocol.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL); + DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); + assertEquals(3, datanodeInfos.length); + + asyncClientProtocol.createSymlink(testPath + "/testCreate.file", + "/link/link.file", new FsPermission(ALL, ALL, ALL), true); + syncReturn(Void.class); + + asyncClientProtocol.getFileLinkInfo("/link/link.file"); + hdfsFileStatus = syncReturn(HdfsFileStatus.class); + assertEquals("testCreate.file", hdfsFileStatus.getSymlink().getName()); + + asyncClientProtocol.rename(testPath + "/testCreate.file", + testPath + "/testRename.file"); + success = syncReturn(boolean.class); + assertTrue(success); + + asyncClientProtocol.delete(testPath, true); + success = syncReturn(boolean.class); + assertTrue(success); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org