This is an automated email from the ASF dual-hosted git repository. keepromise pushed a commit to branch revert-7188-HDFS-17640 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit ddbddf3a0a8504b0577867b575eb8393f86e7021 Author: Jian Zhang <keeprom...@apache.org> AuthorDate: Tue Dec 31 15:35:45 2024 +0800 Revert "HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188)" This reverts commit c48db62c193fb54784a6aa66e40511437c2d5ff6. --- .../federation/router/RouterClientProtocol.java | 133 +-- .../federation/router/RouterFederationRename.java | 2 +- .../server/federation/router/RouterRpcClient.java | 2 +- .../server/federation/router/RouterRpcServer.java | 20 +- .../router/async/RouterAsyncClientProtocol.java | 1083 -------------------- .../router/async/RouterAsyncProtocolTestBase.java | 6 +- .../async/TestRouterAsyncClientProtocol.java | 144 --- 7 files changed, 38 insertions(+), 1352 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index cab4fad1909..d5064821905 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -85,10 +85,6 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding; -import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin; -import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot; -import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy; import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -170,7 +166,7 @@ public class RouterClientProtocol implements ClientProtocol { /** Router security manager to handle token operations. */ private RouterSecurityManager securityManager = null; - public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { + RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { this.rpcServer = rpcServer; this.rpcClient = rpcServer.getRPCClient(); this.subclusterResolver = rpcServer.getSubclusterResolver(); @@ -198,17 +194,10 @@ public class RouterClientProtocol implements ClientProtocol { this.superGroup = conf.get( DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); - if (rpcServer.isAsync()) { - this.erasureCoding = new AsyncErasureCoding(rpcServer); - this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer); - this.snapshotProto = new RouterAsyncSnapshot(rpcServer); - this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer); - } else { - this.erasureCoding = new ErasureCoding(rpcServer); - this.storagePolicy = new RouterStoragePolicy(rpcServer); - this.snapshotProto = new RouterSnapshot(rpcServer); - this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); - } + this.erasureCoding = new ErasureCoding(rpcServer); + this.storagePolicy = new RouterStoragePolicy(rpcServer); + this.snapshotProto = new RouterSnapshot(rpcServer); + this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); this.securityManager = rpcServer.getRouterSecurityManager(); this.rbfRename = new RouterFederationRename(rpcServer, conf); this.defaultNameServiceEnabled = conf.getBoolean( @@ -358,7 +347,7 @@ public class RouterClientProtocol implements ClientProtocol { * @throws IOException If this path is not fault tolerant or the exception * should not be retried (e.g., NSQuotaExceededException). */ - protected List<RemoteLocation> checkFaultTolerantRetry( + private List<RemoteLocation> checkFaultTolerantRetry( final RemoteMethod method, final String src, final IOException ioe, final RemoteLocation excludeLoc, final List<RemoteLocation> locations) throws IOException { @@ -831,7 +820,7 @@ public class RouterClientProtocol implements ClientProtocol { /** * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results. */ - protected static class GetListingComparator + private static class GetListingComparator implements Comparator<byte[]>, Serializable { @Override public int compare(byte[] o1, byte[] o2) { @@ -842,10 +831,6 @@ public class RouterClientProtocol implements ClientProtocol { private static GetListingComparator comparator = new GetListingComparator(); - public static GetListingComparator getComparator() { - return comparator; - } - @Override public DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException { @@ -1119,7 +1104,7 @@ public class RouterClientProtocol implements ClientProtocol { return mergeDtanodeStorageReport(dnSubcluster); } - protected DatanodeStorageReport[] mergeDtanodeStorageReport( + private DatanodeStorageReport[] mergeDtanodeStorageReport( Map<String, DatanodeStorageReport[]> dnSubcluster) { // Avoid repeating machines in multiple subclusters Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>(); @@ -1350,23 +1335,20 @@ public class RouterClientProtocol implements ClientProtocol { } /** - * Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}. + * Get all the locations of the path for {@link this#getContentSummary(String)}. * For example, there are some mount points: - * <p> - * /a - [ns0 - /a] - * /a/b - [ns0 - /a/b] - * /a/b/c - [ns1 - /a/b/c] - * </p> + * /a -> ns0 -> /a + * /a/b -> ns0 -> /a/b + * /a/b/c -> ns1 -> /a/b/c * When the path is '/a', the result of locations should be * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] * When the path is '/b', will throw NoLocationException. - * * @param path the path to get content summary * @return one list contains all the remote location - * @throws IOException if an I/O error occurs + * @throws IOException */ @VisibleForTesting - protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { + List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { // Try to get all the locations of the path. final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path); if (ns2Locations.isEmpty()) { @@ -2057,7 +2039,7 @@ public class RouterClientProtocol implements ClientProtocol { * replacement value. * @throws IOException If the dst paths could not be determined. */ - protected RemoteParam getRenameDestinations( + private RemoteParam getRenameDestinations( final List<RemoteLocation> srcLocations, final List<RemoteLocation> dstLocations) throws IOException { @@ -2105,7 +2087,7 @@ public class RouterClientProtocol implements ClientProtocol { * @param summaries Collection of individual summaries. * @return Aggregated content summary. */ - protected ContentSummary aggregateContentSummary( + private ContentSummary aggregateContentSummary( Collection<ContentSummary> summaries) { if (summaries.size() == 1) { return summaries.iterator().next(); @@ -2160,7 +2142,7 @@ public class RouterClientProtocol implements ClientProtocol { * everywhere. * @throws IOException If all the locations throw an exception. */ - protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, + private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, final RemoteMethod method) throws IOException { return getFileInfoAll(locations, method, -1); } @@ -2175,7 +2157,7 @@ public class RouterClientProtocol implements ClientProtocol { * everywhere. * @throws IOException If all the locations throw an exception. */ - protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, + private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, final RemoteMethod method, long timeOutMs) throws IOException { // Get the file info from everybody @@ -2204,11 +2186,12 @@ public class RouterClientProtocol implements ClientProtocol { /** * Get the permissions for the parent of a child with given permissions. - * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx. + * Add implicit u+wx permission for parent. This is based on + * @{FSDirMkdirOp#addImplicitUwx}. * @param mask The permission mask of the child. * @return The permission mask of the parent. */ - protected static FsPermission getParentPermission(final FsPermission mask) { + private static FsPermission getParentPermission(final FsPermission mask) { FsPermission ret = new FsPermission( mask.getUserAction().or(FsAction.WRITE_EXECUTE), mask.getGroupAction(), @@ -2225,7 +2208,7 @@ public class RouterClientProtocol implements ClientProtocol { * @return New HDFS file status representing a mount point. */ @VisibleForTesting - protected HdfsFileStatus getMountPointStatus( + HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date) { return getMountPointStatus(name, childrenNum, date, true); } @@ -2240,7 +2223,7 @@ public class RouterClientProtocol implements ClientProtocol { * @return New HDFS file status representing a mount point. */ @VisibleForTesting - protected HdfsFileStatus getMountPointStatus( + HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date, boolean setPath) { long modTime = date; long accessTime = date; @@ -2317,7 +2300,7 @@ public class RouterClientProtocol implements ClientProtocol { * @param path Name of the path to start checking dates from. * @return Map with the modification dates for all sub-entries. */ - protected Map<String, Long> getMountPointDates(String path) { + private Map<String, Long> getMountPointDates(String path) { Map<String, Long> ret = new TreeMap<>(); if (subclusterResolver instanceof MountTableResolver) { try { @@ -2378,15 +2361,9 @@ public class RouterClientProtocol implements ClientProtocol { } /** - * Get a partial listing of the indicated directory. - * - * @param src the directory name - * @param startAfter the name to start after - * @param needLocation if blockLocations need to be returned - * @return a partial listing starting after startAfter - * @throws IOException if other I/O error occurred + * Get listing on remote locations. */ - protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( + private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( String src, byte[] startAfter, boolean needLocation) throws IOException { try { List<RemoteLocation> locations = @@ -2423,9 +2400,9 @@ public class RouterClientProtocol implements ClientProtocol { * @param startAfter starting listing from client, used to define listing * start boundary * @param remainingEntries how many entries left from subcluster - * @return true if should add mount point, otherwise false; + * @return */ - protected static boolean shouldAddMountPoint( + private static boolean shouldAddMountPoint( byte[] mountPoint, byte[] lastEntry, byte[] startAfter, int remainingEntries) { if (comparator.compare(mountPoint, startAfter) > 0 && @@ -2448,7 +2425,7 @@ public class RouterClientProtocol implements ClientProtocol { * @throws IOException if unable to get the file status. */ @VisibleForTesting - protected boolean isMultiDestDirectory(String src) throws IOException { + boolean isMultiDestDirectory(String src) throws IOException { try { if (rpcServer.isPathAll(src)) { List<RemoteLocation> locations; @@ -2472,56 +2449,4 @@ public class RouterClientProtocol implements ClientProtocol { public int getRouterFederationRenameCount() { return rbfRename.getRouterFederationRenameCount(); } - - public RouterRpcServer getRpcServer() { - return rpcServer; - } - - public RouterRpcClient getRpcClient() { - return rpcClient; - } - - public FileSubclusterResolver getSubclusterResolver() { - return subclusterResolver; - } - - public ActiveNamenodeResolver getNamenodeResolver() { - return namenodeResolver; - } - - public long getServerDefaultsLastUpdate() { - return serverDefaultsLastUpdate; - } - - public long getServerDefaultsValidityPeriod() { - return serverDefaultsValidityPeriod; - } - - public boolean isAllowPartialList() { - return allowPartialList; - } - - public long getMountStatusTimeOut() { - return mountStatusTimeOut; - } - - public String getSuperUser() { - return superUser; - } - - public String getSuperGroup() { - return superGroup; - } - - public RouterStoragePolicy getStoragePolicy() { - return storagePolicy; - } - - public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) { - this.serverDefaultsLastUpdate = serverDefaultsLastUpdate; - } - - public RouterFederationRename getRbfRename() { - return rbfRename; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java index 772e7257888..aafb685b886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java @@ -93,7 +93,7 @@ public class RouterFederationRename { * @throws IOException if rename fails. * @return true if rename succeeds. */ - public boolean routerFedRename(final String src, final String dst, + boolean routerFedRename(final String src, final String dst, final List<RemoteLocation> srcLocations, final List<RemoteLocation> dstLocations) throws IOException { if (!rpcServer.isEnableRenameAcrossNamespace()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c7c3699f33e..e07de092dd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1969,7 +1969,7 @@ public class RouterRpcClient { * @param nsId namespaceID * @return whether the 'namespace' has observer reads enabled. */ - public boolean isNamespaceObserverReadEligible(String nsId) { + boolean isNamespaceObserverReadEligible(String nsId) { return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 0c1d3dfbdec..39a50d4e3a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -75,11 +75,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota; -import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol; -import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol; import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient; -import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol; import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction; @@ -292,7 +288,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. */ - @SuppressWarnings("checkstyle:MethodLength") public RouterRpcServer(Configuration conf, Router router, ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) throws IOException { @@ -429,19 +424,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, if (this.enableAsync) { this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router, this.namenodeResolver, this.rpcMonitor, routerStateIdContext); - this.clientProto = new RouterAsyncClientProtocol(conf, this); - this.nnProto = new RouterAsyncNamenodeProtocol(this); - this.routerProto = new RouterAsyncUserProtocol(this); - this.quotaCall = new AsyncQuota(this.router, this); } else { this.rpcClient = new RouterRpcClient(this.conf, this.router, this.namenodeResolver, this.rpcMonitor, routerStateIdContext); - this.clientProto = new RouterClientProtocol(conf, this); - this.nnProto = new RouterNamenodeProtocol(this); - this.routerProto = new RouterUserProtocol(this); - this.quotaCall = new Quota(this.router, this); } - + this.nnProto = new RouterNamenodeProtocol(this); + this.quotaCall = new Quota(this.router, this); + this.clientProto = new RouterClientProtocol(conf, this); + this.routerProto = new RouterUserProtocol(this); long dnCacheExpire = conf.getTimeDuration( DN_REPORT_CACHE_EXPIRE, DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); @@ -2203,7 +2193,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @param path Path to check. * @return If a path should be in all subclusters. */ - public boolean isPathAll(final String path) { + boolean isPathAll(final String path) { MountTable entry = getMountTable(path); return entry != null && entry.isAll(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java deleted file mode 100644 index ae44f7aaf1d..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java +++ /dev/null @@ -1,1083 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router.async; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.crypto.CryptoProtocolVersion; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; -import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; -import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; -import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; -import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; -import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; -import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; -import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; -import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename; -import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; -import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; -import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture; - -/** - * Module that implements all the async RPC calls in {@link ClientProtocol} in the - * {@link RouterRpcServer}. - */ -public class RouterAsyncClientProtocol extends RouterClientProtocol { - private static final Logger LOG = - LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName()); - - private final RouterRpcServer rpcServer; - private final RouterRpcClient rpcClient; - private final RouterFederationRename rbfRename; - private final FileSubclusterResolver subclusterResolver; - private final ActiveNamenodeResolver namenodeResolver; - /** If it requires response from all subclusters. */ - private final boolean allowPartialList; - /** Time out when getting the mount statistics. */ - private long mountStatusTimeOut; - /** Identifier for the super user. */ - private String superUser; - /** Identifier for the super group. */ - private final String superGroup; - /** - * Caching server defaults so as to prevent redundant calls to namenode, - * similar to DFSClient, caching saves efforts when router connects - * to multiple clients. - */ - private volatile FsServerDefaults serverDefaults; - - public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) { - super(conf, rpcServer); - this.rpcServer = rpcServer; - this.rpcClient = rpcServer.getRPCClient(); - this.rbfRename = getRbfRename(); - this.subclusterResolver = getSubclusterResolver(); - this.namenodeResolver = getNamenodeResolver(); - this.allowPartialList = isAllowPartialList(); - this.mountStatusTimeOut = getMountStatusTimeOut(); - this.superUser = getSuperUser(); - this.superGroup = getSuperGroup(); - } - - @Override - public FsServerDefaults getServerDefaults() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - long now = Time.monotonicNow(); - if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate() - > getServerDefaultsValidityPeriod())) { - RemoteMethod method = new RemoteMethod("getServerDefaults"); - rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class); - asyncApply(o -> { - serverDefaults = (FsServerDefaults) o; - setServerDefaultsLastUpdate(now); - return serverDefaults; - }); - } else { - asyncComplete(serverDefaults); - } - return asyncReturn(FsServerDefaults.class); - } - - @Override - public HdfsFileStatus create(String src, FsPermission masked, - String clientName, EnumSetWritable<CreateFlag> flag, - boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName, - String storagePolicy) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - if (createParent && rpcServer.isPathAll(src)) { - int index = src.lastIndexOf(Path.SEPARATOR); - String parent = src.substring(0, index); - LOG.debug("Creating {} requires creating parent {}", src, parent); - FsPermission parentPermissions = getParentPermission(masked); - mkdirs(parent, parentPermissions, createParent); - asyncApply((ApplyFunction<Boolean, Boolean>) success -> { - if (!success) { - // This shouldn't happen as mkdirs returns true or exception - LOG.error("Couldn't create parents for {}", src); - } - return success; - }); - } - - RemoteMethod method = new RemoteMethod("create", - new Class<?>[] {String.class, FsPermission.class, String.class, - EnumSetWritable.class, boolean.class, short.class, - long.class, CryptoProtocolVersion[].class, - String.class, String.class}, - new RemoteParam(), masked, clientName, flag, createParent, - replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); - final List<RemoteLocation> locations = - rpcServer.getLocationsForPath(src, true); - final RemoteLocation[] createLocation = new RemoteLocation[1]; - asyncTry(() -> { - rpcServer.getCreateLocationAsync(src, locations); - asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation -> { - createLocation[0] = remoteLocation; - rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class); - asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> { - status.setNamespace(remoteLocation.getNameserviceId()); - return status; - }); - }); - }); - asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> { - final List<RemoteLocation> newLocations = checkFaultTolerantRetry( - method, src, ioe, createLocation[0], locations); - rpcClient.invokeSequential( - newLocations, method, HdfsFileStatus.class, null); - }, IOException.class); - - return asyncReturn(HdfsFileStatus.class); - } - - @Override - public LastBlockWithStatus append( - String src, String clientName, - EnumSetWritable<CreateFlag> flag) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("append", - new Class<?>[] {String.class, String.class, EnumSetWritable.class}, - new RemoteParam(), clientName, flag); - rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, null); - asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> { - LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult(); - lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId()); - return lbws; - }); - return asyncReturn(LastBlockWithStatus.class); - } - - @Deprecated - @Override - public boolean rename(final String src, final String dst) - throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - final List<RemoteLocation> srcLocations = - rpcServer.getLocationsForPath(src, true, false); - final List<RemoteLocation> dstLocations = - rpcServer.getLocationsForPath(dst, false, false); - // srcLocations may be trimmed by getRenameDestinations() - final List<RemoteLocation> locs = new LinkedList<>(srcLocations); - RemoteParam dstParam = getRenameDestinations(locs, dstLocations); - if (locs.isEmpty()) { - asyncComplete( - rbfRename.routerFedRename(src, dst, srcLocations, dstLocations)); - return asyncReturn(Boolean.class); - } - RemoteMethod method = new RemoteMethod("rename", - new Class<?>[] {String.class, String.class}, - new RemoteParam(), dstParam); - isMultiDestDirectory(src); - asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> { - if (isMultiDestDirectory) { - if (locs.size() != srcLocations.size()) { - throw new IOException("Rename of " + src + " to " + dst + " is not" - + " allowed. The number of remote locations for both source and" - + " target should be same."); - } - rpcClient.invokeAll(locs, method); - } else { - rpcClient.invokeSequential(locs, method, Boolean.class, - Boolean.TRUE); - } - }); - return asyncReturn(Boolean.class); - } - - @Override - public void rename2( - final String src, final String dst, - final Options.Rename... options) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - final List<RemoteLocation> srcLocations = - rpcServer.getLocationsForPath(src, true, false); - final List<RemoteLocation> dstLocations = - rpcServer.getLocationsForPath(dst, false, false); - // srcLocations may be trimmed by getRenameDestinations() - final List<RemoteLocation> locs = new LinkedList<>(srcLocations); - RemoteParam dstParam = getRenameDestinations(locs, dstLocations); - if (locs.isEmpty()) { - rbfRename.routerFedRename(src, dst, srcLocations, dstLocations); - return; - } - RemoteMethod method = new RemoteMethod("rename2", - new Class<?>[] {String.class, String.class, options.getClass()}, - new RemoteParam(), dstParam, options); - isMultiDestDirectory(src); - asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> { - if (isMultiDestDirectory) { - if (locs.size() != srcLocations.size()) { - throw new IOException("Rename of " + src + " to " + dst + " is not" - + " allowed. The number of remote locations for both source and" - + " target should be same."); - } - rpcClient.invokeConcurrent(locs, method); - } else { - rpcClient.invokeSequential(locs, method, null, null); - } - }); - } - - @Override - public void concat(String trg, String[] src) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - // Concat only effects when all files in the same namespace. - getFileRemoteLocation(trg); - asyncApply((AsyncApplyFunction<RemoteLocation, Object>) targetDestination -> { - if (targetDestination == null) { - throw new IOException("Cannot find target file - " + trg); - } - String targetNameService = targetDestination.getNameserviceId(); - String[] sourceDestinations = new String[src.length]; - int[] index = new int[1]; - asyncForEach(Arrays.stream(src).iterator(), (forEachRun, sourceFile) -> { - getFileRemoteLocation(sourceFile); - asyncApply((ApplyFunction<RemoteLocation, Object>) srcLocation -> { - if (srcLocation == null) { - throw new IOException("Cannot find source file - " + sourceFile); - } - sourceDestinations[index[0]++] = srcLocation.getDest(); - if (!targetNameService.equals(srcLocation.getNameserviceId())) { - throw new IOException("Cannot concatenate source file " + sourceFile - + " because it is located in a different namespace" + " with nameservice " - + srcLocation.getNameserviceId() + " from the target file with nameservice " - + targetNameService); - } - return null; - }); - }); - asyncApply((AsyncApplyFunction<Object, Object>) o -> { - // Invoke - RemoteMethod method = new RemoteMethod("concat", - new Class<?>[] {String.class, String[].class}, - targetDestination.getDest(), sourceDestinations); - rpcClient.invokeSingle(targetDestination, method, Void.class); - }); - }); - } - - @Override - public boolean mkdirs(String src, FsPermission masked, boolean createParent) - throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - final List<RemoteLocation> locations = - rpcServer.getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("mkdirs", - new Class<?>[] {String.class, FsPermission.class, boolean.class}, - new RemoteParam(), masked, createParent); - - // Create in all locations - if (rpcServer.isPathAll(src)) { - return rpcClient.invokeAll(locations, method); - } - - asyncComplete(false); - if (locations.size() > 1) { - // Check if this directory already exists - asyncTry(() -> { - getFileInfo(src); - asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> { - if (fileStatus != null) { - // When existing, the NN doesn't return an exception; return true - return true; - } - return false; - }); - }); - asyncCatch((ret, ex) -> { - // Can't query if this file exists or not. - LOG.error("Error getting file info for {} while proxying mkdirs: {}", - src, ex.getMessage()); - return false; - }, IOException.class); - } - - final RemoteLocation firstLocation = locations.get(0); - asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> { - if (success) { - asyncComplete(true); - return; - } - asyncTry(() -> { - rpcClient.invokeSingle(firstLocation, method, Boolean.class); - }); - - asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> { - final List<RemoteLocation> newLocations = checkFaultTolerantRetry( - method, src, ioe, firstLocation, locations); - rpcClient.invokeSequential( - newLocations, method, Boolean.class, Boolean.TRUE); - }, IOException.class); - }); - - return asyncReturn(Boolean.class); - } - - @Override - public DirectoryListing getListing( - String src, byte[] startAfter, boolean needLocation) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - GetListingComparator comparator = RouterClientProtocol.getComparator(); - getListingInt(src, startAfter, needLocation); - asyncApply((AsyncApplyFunction<List<RemoteResult<RemoteLocation, DirectoryListing>>, Object>) - listings -> { - TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator); - int totalRemainingEntries = 0; - final int[] remainingEntries = {0}; - boolean namenodeListingExists = false; - // Check the subcluster listing with the smallest name to make sure - // no file is skipped across subclusters - byte[] lastName = null; - if (listings != null) { - for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) { - if (result.hasException()) { - IOException ioe = result.getException(); - if (ioe instanceof FileNotFoundException) { - RemoteLocation location = result.getLocation(); - LOG.debug("Cannot get listing from {}", location); - } else if (!allowPartialList) { - throw ioe; - } - } else if (result.getResult() != null) { - DirectoryListing listing = result.getResult(); - totalRemainingEntries += listing.getRemainingEntries(); - HdfsFileStatus[] partialListing = listing.getPartialListing(); - int length = partialListing.length; - if (length > 0) { - HdfsFileStatus lastLocalEntry = partialListing[length-1]; - byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes(); - if (lastName == null || - comparator.compare(lastName, lastLocalName) > 0) { - lastName = lastLocalName; - } - } - } - } - - // Add existing entries - for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) { - DirectoryListing listing = result.getResult(); - if (listing != null) { - namenodeListingExists = true; - for (HdfsFileStatus file : listing.getPartialListing()) { - byte[] filename = file.getLocalNameInBytes(); - if (totalRemainingEntries > 0 && - comparator.compare(filename, lastName) > 0) { - // Discarding entries further than the lastName - remainingEntries[0]++; - } else { - nnListing.put(filename, file); - } - } - remainingEntries[0] += listing.getRemainingEntries(); - } - } - } - - // Add mount points at this level in the tree - final List<String> children = subclusterResolver.getMountPoints(src); - if (children != null) { - // Get the dates for each mount point - Map<String, Long> dates = getMountPointDates(src); - byte[] finalLastName = lastName; - asyncForEach(children.iterator(), (forEachRun, child) -> { - long date = 0; - if (dates != null && dates.containsKey(child)) { - date = dates.get(child); - } - Path childPath = new Path(src, child); - getMountPointStatus(childPath.toString(), 0, date); - asyncApply((ApplyFunction<HdfsFileStatus, Object>) dirStatus -> { - // if there is no subcluster path, always add mount point - byte[] bChild = DFSUtil.string2Bytes(child); - if (finalLastName == null) { - nnListing.put(bChild, dirStatus); - } else { - if (shouldAddMountPoint(bChild, - finalLastName, startAfter, remainingEntries[0])) { - // This may overwrite existing listing entries with the mount point - // TODO don't add if already there? - nnListing.put(bChild, dirStatus); - } - } - return null; - }); - }); - asyncApply(o -> { - // Update the remaining count to include left mount points - if (nnListing.size() > 0) { - byte[] lastListing = nnListing.lastKey(); - for (int i = 0; i < children.size(); i++) { - byte[] bChild = DFSUtil.string2Bytes(children.get(i)); - if (comparator.compare(bChild, lastListing) > 0) { - remainingEntries[0] += (children.size() - i); - break; - } - } - } - return null; - }); - } - asyncComplete(namenodeListingExists); - asyncApply((ApplyFunction<Boolean, Object>) exists -> { - if (!exists && nnListing.size() == 0 && children == null) { - // NN returns a null object if the directory cannot be found and has no - // listing. If we didn't retrieve any NN listing data, and there are no - // mount points here, return null. - return null; - } - - // Generate combined listing - HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()]; - combinedData = nnListing.values().toArray(combinedData); - return new DirectoryListing(combinedData, remainingEntries[0]); - }); - }); - return asyncReturn(DirectoryListing.class); - } - - /** - * Get listing on remote locations. - */ - @Override - protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt( - String src, byte[] startAfter, boolean needLocation) throws IOException { - List<RemoteLocation> locations = - rpcServer.getLocationsForPath(src, false, false); - // Locate the dir and fetch the listing. - if (locations.isEmpty()) { - asyncComplete(new ArrayList<>()); - return asyncReturn(List.class); - } - asyncTry(() -> { - RemoteMethod method = new RemoteMethod("getListing", - new Class<?>[] {String.class, startAfter.getClass(), boolean.class}, - new RemoteParam(), startAfter, needLocation); - rpcClient.invokeConcurrent(locations, method, false, -1, - DirectoryListing.class); - }); - asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> { - LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage()); - LOG.info("Cannot get locations for {}, {}.", src, e.getMessage()); - return new ArrayList<>(); - }, RouterResolveException.class); - return asyncReturn(List.class); - } - - - @Override - public HdfsFileStatus getFileInfo(String src) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - final IOException[] noLocationException = new IOException[1]; - asyncTry(() -> { - final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false, false); - RemoteMethod method = new RemoteMethod("getFileInfo", - new Class<?>[] {String.class}, new RemoteParam()); - // If it's a directory, we check in all locations - if (rpcServer.isPathAll(src)) { - getFileInfoAll(locations, method); - } else { - // Check for file information sequentially - rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null); - } - }); - asyncCatch((o, e) -> { - if (e instanceof NoLocationException - || e instanceof RouterResolveException) { - noLocationException[0] = e; - } - throw e; - }, IOException.class); - - asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> { - // If there is no real path, check mount points - if (ret == null) { - List<String> children = subclusterResolver.getMountPoints(src); - if (children != null && !children.isEmpty()) { - Map<String, Long> dates = getMountPointDates(src); - long date = 0; - if (dates != null && dates.containsKey(src)) { - date = dates.get(src); - } - getMountPointStatus(src, children.size(), date, false); - } else if (children != null) { - // The src is a mount point, but there are no files or directories - getMountPointStatus(src, 0, 0, false); - } else { - asyncComplete(null); - } - asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) result -> { - // Can't find mount point for path and the path didn't contain any sub monit points, - // throw the NoLocationException to client. - if (result == null && noLocationException[0] != null) { - throw noLocationException[0]; - } - - return result; - }); - } else { - asyncComplete(ret); - } - }); - - return asyncReturn(HdfsFileStatus.class); - } - - @Override - public RemoteLocation getFileRemoteLocation(String path) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false); - if (locations.size() == 1) { - asyncComplete(locations.get(0)); - return asyncReturn(RemoteLocation.class); - } - - asyncForEach(locations.iterator(), (forEachRun, location) -> { - RemoteMethod method = - new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(Collections.singletonList(location), method, - HdfsFileStatus.class, null); - asyncApply((ApplyFunction<HdfsFileStatus, RemoteLocation>) ret -> { - if (ret != null) { - forEachRun.breakNow(); - return location; - } - return null; - }); - }); - - return asyncReturn(RemoteLocation.class); - } - - @Override - public HdfsFileStatus getMountPointStatus( - String name, int childrenNum, long date, boolean setPath) { - long modTime = date; - long accessTime = date; - final FsPermission[] permission = new FsPermission[]{FsPermission.getDirDefault()}; - final String[] owner = new String[]{this.superUser}; - final String[] group = new String[]{this.superGroup}; - final int[] childrenNums = new int[]{childrenNum}; - final EnumSet<HdfsFileStatus.Flags>[] flags = - new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)}; - asyncComplete(null); - if (getSubclusterResolver() instanceof MountTableResolver) { - asyncTry(() -> { - String mName = name.startsWith("/") ? name : "/" + name; - MountTableResolver mountTable = (MountTableResolver) subclusterResolver; - MountTable entry = mountTable.getMountPoint(mName); - if (entry != null) { - permission[0] = entry.getMode(); - owner[0] = entry.getOwnerName(); - group[0] = entry.getGroupName(); - - RemoteMethod method = new RemoteMethod("getFileInfo", - new Class<?>[] {String.class}, new RemoteParam()); - getFileInfoAll( - entry.getDestinations(), method, mountStatusTimeOut); - asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) fInfo -> { - if (fInfo != null) { - permission[0] = fInfo.getPermission(); - owner[0] = fInfo.getOwner(); - group[0] = fInfo.getGroup(); - childrenNums[0] = fInfo.getChildrenNum(); - flags[0] = DFSUtil - .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(), - fInfo.isSnapshotEnabled(), fInfo.hasAcl()); - } - return fInfo; - }); - } - }); - asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> { - LOG.error("Cannot get mount point: {}", e.getMessage()); - return status; - }, IOException.class); - } else { - try { - UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - owner[0] = ugi.getUserName(); - group[0] = ugi.getPrimaryGroupName(); - } catch (IOException e) { - String msg = "Cannot get remote user: " + e.getMessage(); - if (UserGroupInformation.isSecurityEnabled()) { - LOG.error(msg); - } else { - LOG.debug(msg); - } - } - } - long inodeId = 0; - HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder(); - asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> { - if (setPath) { - Path path = new Path(name); - String nameStr = path.getName(); - builder.path(DFSUtil.string2Bytes(nameStr)); - } - - return builder.isdir(true) - .mtime(modTime) - .atime(accessTime) - .perm(permission[0]) - .owner(owner[0]) - .group(group[0]) - .symlink(new byte[0]) - .fileId(inodeId) - .children(childrenNums[0]) - .flags(flags[0]) - .build(); - }); - return asyncReturn(HdfsFileStatus.class); - } - - @Override - protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations, - final RemoteMethod method, long timeOutMs) throws IOException { - - asyncComplete(null); - // Get the file info from everybody - rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs, - HdfsFileStatus.class); - asyncApply(res -> { - Map<RemoteLocation, HdfsFileStatus> results = (Map<RemoteLocation, HdfsFileStatus>) res; - int children = 0; - // We return the first file - HdfsFileStatus dirStatus = null; - for (RemoteLocation loc : locations) { - HdfsFileStatus fileStatus = results.get(loc); - if (fileStatus != null) { - children += fileStatus.getChildrenNum(); - if (!fileStatus.isDirectory()) { - return fileStatus; - } else if (dirStatus == null) { - dirStatus = fileStatus; - } - } - } - if (dirStatus != null) { - return updateMountPointStatus(dirStatus, children); - } - return null; - }); - return asyncReturn(HdfsFileStatus.class); - } - - @Override - public boolean recoverLease(String src, String clientName) throws IOException { - super.recoverLease(src, clientName); - return asyncReturn(boolean.class); - } - - @Override - public long[] getStats() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("getStats"); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false, long[].class); - asyncApply(o -> { - Map<FederationNamespaceInfo, long[]> results - = (Map<FederationNamespaceInfo, long[]>) o; - long[] combinedData = new long[STATS_ARRAY_LENGTH]; - for (long[] data : results.values()) { - for (int i = 0; i < combinedData.length && i < data.length; i++) { - if (data[i] >= 0) { - combinedData[i] += data[i]; - } - } - } - return combinedData; - }); - return asyncReturn(long[].class); - } - - @Override - public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getReplicatedBlockStats"); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, - false, ReplicatedBlockStats.class); - asyncApply(o -> { - Map<FederationNamespaceInfo, ReplicatedBlockStats> ret = - (Map<FederationNamespaceInfo, ReplicatedBlockStats>) o; - return ReplicatedBlockStats.merge(ret.values()); - }); - return asyncReturn(ReplicatedBlockStats.class); - } - - @Override - public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type) - throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); - return rpcServer.getDatanodeReportAsync(type, true, 0); - } - - @Override - public DatanodeStorageReport[] getDatanodeStorageReport( - HdfsConstants.DatanodeReportType type) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); - - rpcServer.getDatanodeStorageReportMapAsync(type); - asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>) - dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster)); - return asyncReturn(DatanodeStorageReport[].class); - } - - public DatanodeStorageReport[] getDatanodeStorageReport( - HdfsConstants.DatanodeReportType type, boolean requireResponse, long timeOutMs) - throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); - - rpcServer.getDatanodeStorageReportMapAsync(type, requireResponse, timeOutMs); - asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>) - dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster)); - return asyncReturn(DatanodeStorageReport[].class); - } - - @Override - public boolean setSafeMode(HdfsConstants.SafeModeAction action, - boolean isChecked) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - // Set safe mode in all the name spaces - RemoteMethod method = new RemoteMethod("setSafeMode", - new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class}, - action, isChecked); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent( - nss, method, true, !isChecked, Boolean.class); - - asyncApply(o -> { - Map<FederationNamespaceInfo, Boolean> results - = (Map<FederationNamespaceInfo, Boolean>) o; - // We only report true if all the name space are in safe mode - int numSafemode = 0; - for (boolean safemode : results.values()) { - if (safemode) { - numSafemode++; - } - } - return numSafemode == results.size(); - }); - return asyncReturn(Boolean.class); - } - - @Override - public boolean saveNamespace(long timeWindow, long txGap) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("saveNamespace", - new Class<?>[] {long.class, long.class}, timeWindow, txGap); - final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, - false, boolean.class); - - asyncApply(o -> { - Map<FederationNamespaceInfo, Boolean> ret = - (Map<FederationNamespaceInfo, Boolean>) o; - boolean success = true; - for (boolean s : ret.values()) { - if (!s) { - success = false; - break; - } - } - return success; - }); - return asyncReturn(Boolean.class); - } - - @Override - public long rollEdits() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {}); - final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false, long.class); - asyncApply(o -> { - Map<FederationNamespaceInfo, Long> ret = - (Map<FederationNamespaceInfo, Long>) o; - // Return the maximum txid - long txid = 0; - for (long t : ret.values()) { - if (t > txid) { - txid = t; - } - } - return txid; - }); - return asyncReturn(long.class); - } - - @Override - public boolean restoreFailedStorage(String arg) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("restoreFailedStorage", - new Class<?>[] {String.class}, arg); - final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class); - asyncApply(o -> { - Map<FederationNamespaceInfo, Boolean> ret = - (Map<FederationNamespaceInfo, Boolean>) o; - boolean success = true; - for (boolean s : ret.values()) { - if (!s) { - success = false; - break; - } - } - return success; - }); - return asyncReturn(boolean.class); - } - - @Override - public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction action) - throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("rollingUpgrade", - new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action); - final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - - rpcClient.invokeConcurrent( - nss, method, true, false, RollingUpgradeInfo.class); - asyncApply(o -> { - Map<FederationNamespaceInfo, RollingUpgradeInfo> ret = - (Map<FederationNamespaceInfo, RollingUpgradeInfo>) o; - // Return the first rolling upgrade info - RollingUpgradeInfo info = null; - for (RollingUpgradeInfo infoNs : ret.values()) { - if (info == null && infoNs != null) { - info = infoNs; - } - } - return info; - }); - return asyncReturn(RollingUpgradeInfo.class); - } - - @Override - public ContentSummary getContentSummary(String path) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - // Get the summaries from regular files - final Collection<ContentSummary> summaries = new ArrayList<>(); - final List<RemoteLocation> locations = getLocationsForContentSummary(path); - final RemoteMethod method = new RemoteMethod("getContentSummary", - new Class<?>[] {String.class}, new RemoteParam()); - rpcClient.invokeConcurrent(locations, method, - false, -1, ContentSummary.class); - - asyncApply(o -> { - final List<RemoteResult<RemoteLocation, ContentSummary>> results = - (List<RemoteResult<RemoteLocation, ContentSummary>>) o; - - FileNotFoundException notFoundException = null; - for (RemoteResult<RemoteLocation, ContentSummary> result : results) { - if (result.hasException()) { - IOException ioe = result.getException(); - if (ioe instanceof FileNotFoundException) { - notFoundException = (FileNotFoundException)ioe; - } else if (!allowPartialList) { - throw ioe; - } - } else if (result.getResult() != null) { - summaries.add(result.getResult()); - } - } - - // Throw original exception if no original nor mount points - if (summaries.isEmpty() && notFoundException != null) { - throw notFoundException; - } - return aggregateContentSummary(summaries); - }); - - return asyncReturn(ContentSummary.class); - } - - @Override - public long getCurrentEditLogTxid() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - RemoteMethod method = new RemoteMethod( - "getCurrentEditLogTxid", new Class<?>[] {}); - final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false, long.class); - - asyncApply(o -> { - Map<FederationNamespaceInfo, Long> ret = - (Map<FederationNamespaceInfo, Long>) o; - // Return the maximum txid - long txid = 0; - for (long t : ret.values()) { - if (t > txid) { - txid = t; - } - } - return txid; - }); - return asyncReturn(long.class); - } - - @Override - public void msync() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, true); - // Only msync to nameservices with observer reads enabled. - Set<FederationNamespaceInfo> allNamespaces = namenodeResolver.getNamespaces(); - RemoteMethod method = new RemoteMethod("msync"); - Set<FederationNamespaceInfo> namespacesEligibleForObserverReads = allNamespaces - .stream() - .filter(ns -> rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId())) - .collect(Collectors.toSet()); - if (namespacesEligibleForObserverReads.isEmpty()) { - asyncCompleteWith(CompletableFuture.completedFuture(null)); - return; - } - rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method); - } - - @Override - public boolean setReplication(String src, short replication) - throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setReplication", - new Class<?>[] {String.class, short.class}, new RemoteParam(), - replication); - if (rpcServer.isInvokeConcurrent(src)) { - rpcClient.invokeConcurrent(locations, method, Boolean.class); - asyncApply(o -> { - Map<RemoteLocation, Boolean> results = (Map<RemoteLocation, Boolean>) o; - return !results.containsValue(false); - }); - } else { - rpcClient.invokeSequential(locations, method, Boolean.class, - Boolean.TRUE); - } - return asyncReturn(boolean.class); - } - - /** - * Checks if the path is a directory and is supposed to be present in all - * subclusters. - * @param src the source path - * @return true if the path is directory and is supposed to be present in all - * subclusters else false in all other scenarios. - * @throws IOException if unable to get the file status. - */ - @Override - public boolean isMultiDestDirectory(String src) throws IOException { - try { - if (rpcServer.isPathAll(src)) { - List<RemoteLocation> locations; - locations = rpcServer.getLocationsForPath(src, false, false); - RemoteMethod method = new RemoteMethod("getFileInfo", - new Class<?>[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(locations, - method, HdfsFileStatus.class, null); - CompletableFuture<Object> completableFuture = getCompletableFuture(); - completableFuture = completableFuture.thenApply(o -> { - HdfsFileStatus fileStatus = (HdfsFileStatus) o; - if (fileStatus != null) { - return fileStatus.isDirectory(); - } else { - LOG.debug("The destination {} doesn't exist.", src); - } - return false; - }); - asyncCompleteWith(completableFuture); - return asyncReturn(Boolean.class); - } - } catch (UnresolvedPathException e) { - LOG.debug("The destination {} is a symlink.", src); - } - asyncCompleteWith(CompletableFuture.completedFuture(false)); - return asyncReturn(Boolean.class); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java index 51a3a1b9c28..cc25516d59f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java @@ -56,7 +56,6 @@ public class RouterAsyncProtocolTestBase { private FileSystem routerFs; private RouterRpcServer routerRpcServer; private RouterRpcServer routerAsyncRpcServer; - protected static final String TEST_DIR_PATH = "/testdir"; @BeforeClass public static void setUpCluster() throws Exception { @@ -115,20 +114,19 @@ public class RouterAsyncProtocolTestBase { routerRpcServer.getRouterStateIdContext()); routerAsyncRpcServer = Mockito.spy(routerRpcServer); Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient); - Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true); // Create mock locations MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); resolver.addLocation("/", ns0, "/"); FsPermission permission = new FsPermission("705"); - routerFs.mkdirs(new Path(TEST_DIR_PATH), permission); + routerFs.mkdirs(new Path("/testdir"), permission); } @After public void tearDown() throws IOException { // clear client context CallerContext.setCurrent(null); - boolean delete = routerFs.delete(new Path(TEST_DIR_PATH)); + boolean delete = routerFs.delete(new Path("/testdir")); assertTrue(delete); if (routerFs != null) { routerFs.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java deleted file mode 100644 index 96f957f93df..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router.async; - -import org.apache.hadoop.crypto.CryptoProtocolVersion; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; -import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.util.Lists; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.List; - -import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES; -import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; -import static org.apache.hadoop.fs.permission.AclEntryType.USER; -import static org.apache.hadoop.fs.permission.FsAction.ALL; -import static org.apache.hadoop.fs.permission.FsAction.NONE; -import static org.apache.hadoop.fs.permission.FsAction.READ; -import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; -import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * Used to test the functionality of {@link RouterAsyncClientProtocol}. - */ -public class TestRouterAsyncClientProtocol extends RouterAsyncProtocolTestBase { - private RouterAsyncClientProtocol asyncClientProtocol; - private RouterClientProtocol clientProtocol; - private final String testPath = TEST_DIR_PATH + "/test"; - - @Before - public void setup() throws IOException { - asyncClientProtocol = new RouterAsyncClientProtocol(getRouterConf(), getRouterAsyncRpcServer()); - clientProtocol = new RouterClientProtocol(getRouterConf(), getRouterRpcServer()); - } - - @Test - public void testGetServerDefaults() throws Exception { - FsServerDefaults serverDefaults = clientProtocol.getServerDefaults(); - asyncClientProtocol.getServerDefaults(); - FsServerDefaults fsServerDefaults = syncReturn(FsServerDefaults.class); - assertEquals(serverDefaults.getBlockSize(), fsServerDefaults.getBlockSize()); - assertEquals(serverDefaults.getReplication(), fsServerDefaults.getReplication()); - assertEquals(serverDefaults.getChecksumType(), fsServerDefaults.getChecksumType()); - assertEquals( - serverDefaults.getDefaultStoragePolicyId(), fsServerDefaults.getDefaultStoragePolicyId()); - } - - @Test - public void testClientProtocolRpc() throws Exception { - asyncClientProtocol.mkdirs(testPath, new FsPermission(ALL, ALL, ALL), false); - Boolean success = syncReturn(Boolean.class); - assertTrue(success); - - asyncClientProtocol.setPermission(testPath, new FsPermission(READ_WRITE, READ, NONE)); - syncReturn(Void.class); - - asyncClientProtocol.getFileInfo(testPath); - HdfsFileStatus hdfsFileStatus = syncReturn(HdfsFileStatus.class); - assertEquals(hdfsFileStatus.getPermission(), new FsPermission(READ_WRITE, READ, NONE)); - - List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "tmpUser", ALL)); - asyncClientProtocol.setAcl(testPath, aclSpec); - syncReturn(Void.class); - asyncClientProtocol.setOwner(testPath, "tmpUser", "tmpUserGroup"); - syncReturn(Void.class); - - asyncClientProtocol.getFileInfo(testPath); - hdfsFileStatus = syncReturn(HdfsFileStatus.class); - assertEquals("tmpUser", hdfsFileStatus.getOwner()); - assertEquals("tmpUserGroup", hdfsFileStatus.getGroup()); - - asyncClientProtocol.create(testPath + "/testCreate.file", - new FsPermission(ALL, ALL, ALL), "testAsyncClient", - new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), - false, (short) 1, 128 * 1024 * 1024L, - new CryptoProtocolVersion[]{ENCRYPTION_ZONES}, - null, null); - hdfsFileStatus = syncReturn(HdfsFileStatus.class); - assertTrue(hdfsFileStatus.isFile()); - assertEquals(128 * 1024 * 1024, hdfsFileStatus.getBlockSize()); - - asyncClientProtocol.getFileRemoteLocation(testPath); - RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); - assertNotNull(remoteLocation); - assertEquals(getNs0(), remoteLocation.getNameserviceId()); - assertEquals(testPath, remoteLocation.getSrc()); - - asyncClientProtocol.getListing(testPath, new byte[1], true); - DirectoryListing directoryListing = syncReturn(DirectoryListing.class); - assertEquals(1, directoryListing.getPartialListing().length); - - asyncClientProtocol.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL); - DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); - assertEquals(3, datanodeInfos.length); - - asyncClientProtocol.createSymlink(testPath + "/testCreate.file", - "/link/link.file", new FsPermission(ALL, ALL, ALL), true); - syncReturn(Void.class); - - asyncClientProtocol.getFileLinkInfo("/link/link.file"); - hdfsFileStatus = syncReturn(HdfsFileStatus.class); - assertEquals("testCreate.file", hdfsFileStatus.getSymlink().getName()); - - asyncClientProtocol.rename(testPath + "/testCreate.file", - testPath + "/testRename.file"); - success = syncReturn(boolean.class); - assertTrue(success); - - asyncClientProtocol.delete(testPath, true); - success = syncReturn(boolean.class); - assertTrue(success); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org