This is an automated email from the ASF dual-hosted git repository. keepromise pushed a commit to branch HDFS-17531 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-17531 by this push: new d4a6a271385 HDFS-17672. [ARR] Move asynchronous related classes to the async package. (#7184). Contributed by Jian Zhang. d4a6a271385 is described below commit d4a6a271385314318bdfb7284134888d60740c67 Author: Jian Zhang <keeprom...@apache.org> AuthorDate: Thu Nov 28 20:13:11 2024 +0800 HDFS-17672. [ARR] Move asynchronous related classes to the async package. (#7184). Contributed by Jian Zhang. --- .../hdfs/protocolPB/AsyncRpcProtocolPBUtil.java | 8 +- .../hdfs/server/federation/router/Quota.java | 5 +- .../federation/router/RouterAdminServer.java | 2 +- .../router/RouterQuotaUpdateService.java | 2 +- .../server/federation/router/RouterRpcClient.java | 18 +- .../server/federation/router/RouterRpcServer.java | 31 ++-- .../federation/router/RouterStateIdContext.java | 13 +- .../router/{ => async}/AsyncErasureCoding.java | 83 ++++++++- .../federation/router/{ => async}/AsyncQuota.java | 22 ++- .../router/{ => async}/RouterAsyncCacheAdmin.java | 42 ++++- .../async/RouterAsyncNamenodeProtocol.java | 10 +- .../router/{ => async}/RouterAsyncRpcClient.java | 42 +++-- .../router/{ => async}/RouterAsyncSnapshot.java | 69 +++++++- .../{ => async}/RouterAsyncStoragePolicy.java | 31 +++- .../async/RouterAsyncUserProtocol.java | 13 +- .../federation/router/async/package-info.java | 4 - .../router/async/{ => utils}/ApplyFunction.java | 4 +- .../federation/router/async/{ => utils}/Async.java | 2 +- .../async/{ => utils}/AsyncApplyFunction.java | 4 +- .../router/async/{ => utils}/AsyncBiFunction.java | 2 +- .../async/{ => utils}/AsyncCatchFunction.java | 6 +- .../router/async/{ => utils}/AsyncForEachRun.java | 4 +- .../router/async/{ => utils}/AsyncRun.java | 2 +- .../router/async/{ => utils}/AsyncUtil.java | 6 +- .../router/async/{ => utils}/CatchFunction.java | 8 +- .../router/async/{ => utils}/FinallyFunction.java | 4 +- .../async/utils}/package-info.java | 6 +- .../protocolPB/TestAsyncRpcProtocolPBUtil.java | 4 +- .../TestRouterClientSideTranslatorPB.java | 2 +- .../router/TestRouterAsyncCacheAdmin.java | 195 --------------------- .../router/TestRouterAsyncRpcServer.java | 191 -------------------- .../router/TestRouterAsyncStoragePolicy.java | 163 ----------------- .../async/RouterAsyncProtocolTestBase.java | 3 +- .../router/async/TestRouterAsyncCacheAdmin.java | 102 +++++++++++ .../{ => async}/TestRouterAsyncErasureCoding.java | 6 +- .../async/TestRouterAsyncNamenodeProtocol.java | 4 +- .../router/{ => async}/TestRouterAsyncQuota.java | 6 +- .../{ => async}/TestRouterAsyncRpcClient.java | 9 +- .../router/async/TestRouterAsyncRpcServer.java | 96 ++++++++++ .../{ => async}/TestRouterAsyncSnapshot.java | 114 ++---------- .../router/async/TestRouterAsyncStoragePolicy.java | 66 +++++++ .../async/TestRouterAsyncUserProtocol.java | 4 +- .../router/async/{ => utils}/AsyncClass.java | 20 +-- .../router/async/{ => utils}/BaseClass.java | 2 +- .../router/async/{ => utils}/SyncClass.java | 2 +- .../router/async/{ => utils}/TestAsyncUtil.java | 2 +- .../security/TestRouterSecurityManager.java | 2 - 47 files changed, 649 insertions(+), 787 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java index fa798e2f358..d04a61e6862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB; import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine2; @@ -32,9 +32,9 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java index f28af6afa7b..2c4bcc92b47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -139,7 +139,7 @@ public class Quota { * @return quota usage for each remote location. * @throws IOException If the quota system is disabled. */ - Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path) + protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path) throws IOException { rpcServer.checkOperation(OperationCategory.READ); if (!router.isQuotaEnabled()) { @@ -252,8 +252,9 @@ public class Quota { * @param path Federation path of the results. * @param results Quota query result. * @return Aggregated Quota. + * @throws IOException If the quota system is disabled. */ - QuotaUsage aggregateQuota(String path, + protected QuotaUsage aggregateQuota(String path, Map<RemoteLocation, QuotaUsage> results) throws IOException { long nsCount = 0; long ssCount = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index a462b7a5f73..d9cacf2e75e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -21,7 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import java.io.IOException; import java.net.InetSocketAddress; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java index 235190d2a48..124d55586d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; /** * Service to periodically update the {@link RouterQuotaUsage} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 70b5034272d..e07de092dd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1940,7 +1940,7 @@ public class RouterRpcClient { * @return A prioritized list of NNs to use for communication. * @throws IOException If a NN cannot be located for the nameservice ID. */ - protected List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId, + public List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId, boolean isObserverRead) throws IOException { final List<? extends FederationNamenodeContext> namenodes; @@ -2047,39 +2047,39 @@ public class RouterRpcClient { private static final byte SHOULD_USE_OBSERVER_BIT = 2; private static final byte COMPLETE_BIT = 4; - ExecutionStatus() { + public ExecutionStatus() { this(false, false); } - ExecutionStatus(boolean failOver, boolean shouldUseObserver) { + public ExecutionStatus(boolean failOver, boolean shouldUseObserver) { this.flag = 0; setFailOver(failOver); setShouldUseObserver(shouldUseObserver); setComplete(false); } - private void setFailOver(boolean failOver) { + public void setFailOver(boolean failOver) { flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT)); } - private void setShouldUseObserver(boolean shouldUseObserver) { + public void setShouldUseObserver(boolean shouldUseObserver) { flag = (byte) (shouldUseObserver ? (flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT)); } - void setComplete(boolean complete) { + public void setComplete(boolean complete) { flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT)); } - boolean isFailOver() { + public boolean isFailOver() { return (flag & FAIL_OVER_BIT) != 0; } - boolean isShouldUseObserver() { + public boolean isShouldUseObserver() { return (flag & SHOULD_USE_OBSERVER_BIT) != 0; } - boolean isComplete() { + public boolean isComplete() { return (flag & COMPLETE_BIT) != 0; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 6fb189b0e01..39a50d4e3a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -37,12 +37,12 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_R import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; import java.io.FileNotFoundException; @@ -75,9 +75,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -686,7 +687,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * client requests. * @throws UnsupportedOperationException If the operation is not supported. */ - void checkOperation(OperationCategory op, boolean supported) + public void checkOperation(OperationCategory op, boolean supported) throws StandbyException, UnsupportedOperationException { checkOperation(op); @@ -1032,7 +1033,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @return The remote location for this file. * @throws IOException If the file has no creation location. */ - RemoteLocation getCreateLocationAsync( + public RemoteLocation getCreateLocationAsync( final String src, final List<RemoteLocation> locations) throws IOException { @@ -1995,7 +1996,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @return Prioritized list of locations in the federated cluster. * @throws IOException If the location for this path cannot be determined. */ - protected List<RemoteLocation> getLocationsForPath(String path, + public List<RemoteLocation> getLocationsForPath(String path, boolean failIfLocked) throws IOException { return getLocationsForPath(path, failIfLocked, true); } @@ -2010,7 +2011,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @return Prioritized list of locations in the federated cluster. * @throws IOException If the location for this path cannot be determined. */ - protected List<RemoteLocation> getLocationsForPath(String path, + public List<RemoteLocation> getLocationsForPath(String path, boolean failIfLocked, boolean needQuotaVerify) throws IOException { try { if (failIfLocked) { @@ -2227,9 +2228,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * mount entry. * @param path The path on which the operation need to be invoked. * @return true if the call is supposed to invoked on all locations. - * @throws IOException + * @throws IOException If an I/O error occurs. */ - boolean isInvokeConcurrent(final String path) throws IOException { + public boolean isInvokeConcurrent(final String path) throws IOException { if (subclusterResolver instanceof MountTableResolver) { MountTableResolver mountTableResolver = (MountTableResolver) subclusterResolver; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index e239e5e9059..21e3f16f206 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -48,7 +48,7 @@ import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; */ @InterfaceAudience.Private @InterfaceStability.Evolving -class RouterStateIdContext implements AlignmentContext { +public class RouterStateIdContext implements AlignmentContext { private final HashSet<String> coordinatedMethods; /** @@ -93,6 +93,8 @@ class RouterStateIdContext implements AlignmentContext { /** * Adds the {@link #namespaceIdMap} to the response header that will be sent to a client. + * + * @param headerBuilder the response header that will be sent to a client. */ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) { if (namespaceIdMap.isEmpty()) { @@ -110,7 +112,8 @@ class RouterStateIdContext implements AlignmentContext { } public LongAccumulator getNamespaceStateId(String nsId) { - return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE)); + return namespaceIdMap.computeIfAbsent(nsId, + key -> new LongAccumulator(Math::max, Long.MIN_VALUE)); } public List<String> getNamespaces() { @@ -127,6 +130,9 @@ class RouterStateIdContext implements AlignmentContext { /** * Utility function to parse routerFederatedState field in RPC headers. + * + * @param byteString the byte string of routerFederatedState. + * @return the router federated state map. */ public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) { if (byteString != null) { @@ -148,7 +154,8 @@ class RouterStateIdContext implements AlignmentContext { if (call != null) { ByteString callFederatedNamespaceState = call.getFederatedNamespaceState(); if (callFederatedNamespaceState != null) { - Map<String, Long> clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState); + Map<String, Long> clientFederatedStateIds = + getRouterFederatedStateMap(callFederatedNamespaceState); clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java similarity index 66% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java index 9f1dbe5f2c4..d40c6a1a93a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; @@ -25,7 +25,12 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.ErasureCoding; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import java.io.IOException; @@ -36,9 +41,15 @@ import java.util.Map; import java.util.Set; import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +/** + * Provides asynchronous operations for erasure coding in HDFS Federation. + * This class extends {@link org.apache.hadoop.hdfs.server.federation.router.ErasureCoding} + * and overrides its methods to perform erasure coding operations in a non-blocking manner, + * allowing for concurrent execution and improved performance. + */ public class AsyncErasureCoding extends ErasureCoding { /** RPC server to receive client calls. */ private final RouterRpcServer rpcServer; @@ -54,6 +65,17 @@ public class AsyncErasureCoding extends ErasureCoding { this.namenodeResolver = this.rpcClient.getNamenodeResolver(); } + /** + * Asynchronously get an array of all erasure coding policies. + * This method checks the operation category and then invokes the + * getErasureCodingPolicies method concurrently across all namespaces. + * <p> + * The results are merged and returned as an array of ErasureCodingPolicyInfo. + * + * @return Array of ErasureCodingPolicyInfo. + * @throws IOException If an I/O error occurs. + */ + @Override public ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); @@ -70,6 +92,16 @@ public class AsyncErasureCoding extends ErasureCoding { return asyncReturn(ErasureCodingPolicyInfo[].class); } + /** + * Asynchronously get the erasure coding codecs available. + * This method checks the operation category and then invokes the + * getErasureCodingCodecs method concurrently across all namespaces. + * <p> + * The results are merged into a single map of codec names to codec properties. + * + * @return Map of erasure coding codecs. + * @throws IOException If an I/O error occurs. + */ @Override public Map<String, String> getErasureCodingCodecs() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); @@ -97,6 +129,17 @@ public class AsyncErasureCoding extends ErasureCoding { return asyncReturn(Map.class); } + /** + * Asynchronously add an array of erasure coding policies. + * This method checks the operation category and then invokes the + * addErasureCodingPolicies method concurrently across all namespaces. + * <p> + * The results are merged and returned as an array of AddErasureCodingPolicyResponse. + * + * @param policies Array of erasure coding policies to add. + * @return Array of AddErasureCodingPolicyResponse. + * @throws IOException If an I/O error occurs. + */ @Override public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( ErasureCodingPolicy[] policies) throws IOException { @@ -117,6 +160,17 @@ public class AsyncErasureCoding extends ErasureCoding { return asyncReturn(AddErasureCodingPolicyResponse[].class); } + /** + * Asynchronously get the erasure coding policy for a given source path. + * This method checks the operation category and then invokes the + * getErasureCodingPolicy method sequentially for the given path. + * <p> + * The result is returned as an ErasureCodingPolicy object. + * + * @param src Source path to get the erasure coding policy for. + * @return ErasureCodingPolicy for the given path. + * @throws IOException If an I/O error occurs. + */ @Override public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException { @@ -136,6 +190,17 @@ public class AsyncErasureCoding extends ErasureCoding { return asyncReturn(ErasureCodingPolicy.class); } + /** + * Asynchronously get the EC topology result for the given policies. + * This method checks the operation category and then invokes the + * getECTopologyResultForPolicies method concurrently across all namespaces. + * <p> + * The results are merged and the first unsupported result is returned. + * + * @param policyNames Array of policy names to check. + * @return ECTopologyVerifierResult for the policies. + * @throws IOException If an I/O error occurs. + */ @Override public ECTopologyVerifierResult getECTopologyResultForPolicies( String[] policyNames) throws IOException { @@ -162,6 +227,16 @@ public class AsyncErasureCoding extends ErasureCoding { return asyncReturn(ECTopologyVerifierResult.class); } + /** + * Asynchronously get the erasure coding block group statistics. + * This method checks the operation category and then invokes the + * getECBlockGroupStats method concurrently across all namespaces. + * <p> + * The results are merged and returned as an ECBlockGroupStats object. + * + * @return ECBlockGroupStats for the erasure coding block groups. + * @throws IOException If an I/O error occurs. + */ @Override public ECBlockGroupStats getECBlockGroupStats() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java similarity index 76% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java index 5d76171a548..0980cf2a2fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java @@ -15,10 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.Quota; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.namenode.NameNode; import java.io.IOException; @@ -26,9 +32,15 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +/** + * Provides asynchronous operations for managing quotas in HDFS Federation. + * This class extends {@link org.apache.hadoop.hdfs.server.federation.router.Quota} + * and overrides its methods to perform quota operations in a non-blocking manner, + * allowing for concurrent execution and improved performance. + */ public class AsyncQuota extends Quota { /** RPC server to receive client calls. */ @@ -50,6 +62,7 @@ public class AsyncQuota extends Quota { * @return Aggregated quota. * @throws IOException If the quota system is disabled. */ + @Override public QuotaUsage getQuotaUsage(String path) throws IOException { getEachQuotaUsage(path); @@ -70,7 +83,8 @@ public class AsyncQuota extends Quota { * @return quota usage for each remote location. * @throws IOException If the quota system is disabled. */ - Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path) + @Override + protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); if (!router.isQuotaEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java similarity index 61% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java index fca43e15879..20ec36c935c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -25,14 +25,16 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.RouterCacheAdmin; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import java.io.IOException; import java.util.EnumSet; import java.util.Map; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; /** * Module that implements all the asynchronous RPC calls in @@ -45,6 +47,17 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin { super(server); } + /** + * Asynchronously adds a new cache directive with the given path and flags. + * This method invokes the addCacheDirective method concurrently across all + * namespaces, and returns the first response as a long value representing the + * directive ID. + * + * @param path The cache directive path. + * @param flags The cache flags. + * @return The ID of the newly added cache directive. + * @throws IOException If an I/O error occurs. + */ @Override public long addCacheDirective( CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { @@ -54,6 +67,17 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin { return asyncReturn(Long.class); } + /** + * Asynchronously lists cache directives based on the provided previous ID and filter. + * This method invokes the listCacheDirectives method concurrently across all + * namespaces, and returns the first response as a BatchedEntries object containing + * the cache directive entries. + * + * @param prevId The previous ID from which to start listing. + * @param filter The filter to apply to the cache directives. + * @return BatchedEntries of cache directive entries. + * @throws IOException If an I/O error occurs. + */ @Override public BatchedEntries<CacheDirectiveEntry> listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException { @@ -64,6 +88,16 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin { return asyncReturn(BatchedEntries.class); } + /** + * Asynchronously lists cache pools starting from the provided key. + * This method invokes the listCachePools method concurrently across all namespaces, + * and returns the first response as a BatchedEntries object containing the cache + * pool entries. + * + * @param prevKey The previous key from which to start listing. + * @return BatchedEntries of cache pool entries. + * @throws IOException If an I/O error occurs. + */ @Override public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException { invokeListCachePools(prevKey); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java index fe05a57b854..fc461dd22af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.async; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -25,7 +25,7 @@ import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -35,9 +35,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import java.io.IOException; import java.util.Map; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; /** * Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java similarity index 94% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 2bdcd7ce287..249b7e1c82a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient; @@ -24,9 +24,19 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext; +import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdContext; +import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; @@ -50,18 +60,18 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApplyUseExecutor; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.getCompletableFuture; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture; /** * The {@code RouterAsyncRpcClient} class extends the functionality of the base diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java similarity index 73% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java index 8d830b84271..c38d243aa38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -26,7 +26,13 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.RouterSnapshot; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import java.io.IOException; @@ -35,8 +41,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; /** * Module that implements all the asynchronous RPC calls related to snapshots in @@ -57,6 +63,16 @@ public class RouterAsyncSnapshot extends RouterSnapshot { this.namenodeResolver = rpcServer.getNamenodeResolver(); } + /** + * Asynchronously creates a snapshot with the given root and name. + * This method checks the operation category and then invokes the createSnapshot + * method concurrently across all namespaces, returning the first successful response. + * + * @param snapshotRoot The root path of the snapshot. + * @param snapshotName The name of the snapshot. + * @return The path of the created snapshot. + * @throws IOException If an I/O error occurs. + */ @Override public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); @@ -89,6 +105,15 @@ public class RouterAsyncSnapshot extends RouterSnapshot { return asyncReturn(String.class); } + /** + * Asynchronously get an array of snapshottable directory listings. + * This method checks the operation category and then invokes the + * getSnapshottableDirListing method concurrently across all namespaces, merging + * the results into a single array. + * + * @return Array of SnapshottableDirectoryStatus. + * @throws IOException If an I/O error occurs. + */ @Override public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); @@ -103,6 +128,16 @@ public class RouterAsyncSnapshot extends RouterSnapshot { return asyncReturn(SnapshottableDirectoryStatus[].class); } + /** + * Asynchronously get an array of snapshot listings for the given snapshot root. + * This method checks the operation category and then invokes the + * getSnapshotListing method, either sequentially or concurrently based on the + * configuration, and returns the merged results. + * + * @param snapshotRoot The root path of the snapshots. + * @return Array of SnapshotStatus. + * @throws IOException If an I/O error occurs. + */ @Override public SnapshotStatus[] getSnapshotListing(String snapshotRoot) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); @@ -145,6 +180,18 @@ public class RouterAsyncSnapshot extends RouterSnapshot { return asyncReturn(SnapshotStatus[].class); } + /** + * Asynchronously get a snapshot diff report for the given root and snapshot names. + * This method checks the operation category and then invokes the + * getSnapshotDiffReport method, either sequentially or concurrently based on the + * configuration, and returns the result. + * + * @param snapshotRoot The root path of the snapshot. + * @param earlierSnapshotName The name of the earlier snapshot. + * @param laterSnapshotName The name of the later snapshot. + * @return SnapshotDiffReport for the snapshots. + * @throws IOException If an I/O error occurs. + */ @Override public SnapshotDiffReport getSnapshotDiffReport( String snapshotRoot, String earlierSnapshotName, @@ -169,6 +216,20 @@ public class RouterAsyncSnapshot extends RouterSnapshot { } } + /** + * Asynchronously get a snapshot diff report listing for the given root and snapshot names. + * This method checks the operation category and then invokes the + * getSnapshotDiffReportListing method, either sequentially or concurrently based + * on the configuration, and returns the result. + * + * @param snapshotRoot The root path of the snapshot. + * @param earlierSnapshotName The name of the earlier snapshot. + * @param laterSnapshotName The name of the later snapshot. + * @param startPath The starting path for the diff report. + * @param index The index for the diff report listing. + * @return SnapshotDiffReportListing for the snapshots. + * @throws IOException If an I/O error occurs. + */ @Override public SnapshotDiffReportListing getSnapshotDiffReportListing( String snapshotRoot, String earlierSnapshotName, String laterSnapshotName, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java similarity index 64% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java index 7e019e13bbe..cf23d0f7cc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java @@ -15,17 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.RouterStoragePolicy; import org.apache.hadoop.hdfs.server.namenode.NameNode; import java.io.IOException; import java.util.List; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +/** + * Module that implements all the asynchronous RPC calls in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to + * Storage Policy in the {@link RouterRpcServer}. + */ public class RouterAsyncStoragePolicy extends RouterStoragePolicy { /** RPC server to receive client calls. */ private final RouterRpcServer rpcServer; @@ -38,6 +48,15 @@ public class RouterAsyncStoragePolicy extends RouterStoragePolicy { this.rpcClient = this.rpcServer.getRPCClient(); } + /** + * Asynchronously get the storage policy for a given path. + * This method checks the operation category and then invokes the + * getStoragePolicy method sequentially for the given path. + * + * @param path The path for which to retrieve the storage policy. + * @return The BlockStoragePolicy for the given path. + * @throws IOException If an I/O error occurs. + */ @Override public BlockStoragePolicy getStoragePolicy(String path) throws IOException { @@ -52,6 +71,14 @@ public class RouterAsyncStoragePolicy extends RouterStoragePolicy { return asyncReturn(BlockStoragePolicy.class); } + /** + * Asynchronously get an array of all available storage policies. + * This method checks the operation category and then invokes the + * getStoragePolicies method across all available namespaces. + * + * @return An array of BlockStoragePolicy. + * @throws IOException If an I/O error occurs. + */ @Override public BlockStoragePolicy[] getStoragePolicies() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java index 3e03a9bdd5c..68b5aa85284 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.async; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; -import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.RefreshUserMappingsProtocol; @@ -38,9 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; /** * Module that implements all the asynchronous RPC calls in @@ -67,6 +67,7 @@ public class RouterAsyncUserProtocol extends RouterUserProtocol { /** * Asynchronously refresh user to group mappings. + * * @throws IOException raised on errors performing I/O. */ @Override @@ -86,6 +87,7 @@ public class RouterAsyncUserProtocol extends RouterUserProtocol { /** * Asynchronously refresh superuser proxy group list. + * * @throws IOException raised on errors performing I/O. */ @Override @@ -105,6 +107,7 @@ public class RouterAsyncUserProtocol extends RouterUserProtocol { /** * Asynchronously get the groups which are mapped to the given user. + * * @param user The user to get the groups for. * @return The set of groups the user belongs to. * @throws IOException raised on errors performing I/O. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java index 48fd0ad89ab..e0ecc4b36f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java @@ -21,10 +21,6 @@ * Distributed File System (HDFS) Federation router. These classes are designed to work with * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that * can improve the performance and responsiveness of HDFS operations. - * - * <p>These classes work together to enable complex asynchronous workflows, making it easier to - * write code that can handle long-running tasks without blocking, thus improving the overall - * efficiency and scalability of HDFS operations.</p> */ @InterfaceAudience.Private @InterfaceStability.Evolving diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java index ac3d5edb635..721adf1e4d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * Represents a function that accepts a value of type T and produces a result of type R. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java index e184bffaeef..09cd649b7e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java index b34a6c479cd..0355efabbc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * The AsyncApplyFunction interface represents a function that diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java index 3e94736e7f1..8e7ac35726b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java index 01fcc44a19f..714f57827ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * The AsyncCatchFunction interface represents a function that handles exceptions diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java index 322242d1c49..ae984a84b46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * The AsyncForEachRun class is part of the asynchronous operation utilities diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java index 03d39f36d7d..e619a026e19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java similarity index 99% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java index 834741b52db..79ae88f6bcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.Collection; @@ -25,8 +25,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Function; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.CUR_COMPLETABLE_FUTURE; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * The AsyncUtil class provides a collection of utility methods to simplify diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java index fbb0af56ce0..a87ddf18aae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * The {@code CatchFunction} interface represents a function that handles exceptions @@ -58,7 +58,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCo */ @FunctionalInterface public interface CatchFunction<R, E extends Throwable> - extends Async<R>{ + extends Async<R> { /** * Applies this catch function to the given result and exception. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java index 0243f0a0a1a..671d380ac8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; /** * The {@code FinallyFunction} interface represents a function that is used to perform diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java similarity index 80% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java index 36e0513bb6a..5ffbebf9e71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java @@ -21,11 +21,15 @@ * Distributed File System (HDFS) Federation router. These classes are designed to work with * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that * can improve the performance and responsiveness of HDFS operations. + * + * <p>These classes work together to enable complex asynchronous workflows, making it easier to + * write code that can handle long-running tasks without blocking, thus improving the overall + * efficiency and scalability of HDFS operations.</p> */ @InterfaceAudience.Private @InterfaceStability.Evolving -package org.apache.hadoop.hdfs.server.federation.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java index 15da20fdd11..9b88e3b9956 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; @@ -42,7 +42,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.ForkJoinPool; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java index 3519a968c5b..637a4b38ae7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java @@ -62,7 +62,7 @@ import static org.apache.hadoop.fs.permission.FsAction.ALL; import static org.apache.hadoop.fs.permission.FsAction.NONE; import static org.apache.hadoop.fs.permission.FsAction.READ; import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java deleted file mode 100644 index 4ad5ccbfee6..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CacheFlag; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; -import org.apache.hadoop.hdfs.protocol.CachePoolEntry; -import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; -import org.apache.hadoop.hdfs.server.federation.MockResolver; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; -import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; -import org.apache.hadoop.ipc.CallerContext; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; -import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestRouterAsyncCacheAdmin { - private static Configuration routerConf; - /** Federated HDFS cluster. */ - private static MiniRouterDFSCluster cluster; - private static String ns0; - - /** Random Router for this federated cluster. */ - private MiniRouterDFSCluster.RouterContext router; - private FileSystem routerFs; - private RouterRpcServer routerRpcServer; - private RouterAsyncCacheAdmin asyncCacheAdmin; - - @BeforeClass - public static void setUpCluster() throws Exception { - cluster = new MiniRouterDFSCluster(true, 1, 2, - DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); - cluster.setNumDatanodesPerNameservice(3); - - cluster.startCluster(); - - // Making one Namenode active per nameservice - if (cluster.isHighAvailability()) { - for (String ns : cluster.getNameservices()) { - cluster.switchToActive(ns, NAMENODES[0]); - cluster.switchToStandby(ns, NAMENODES[1]); - } - } - // Start routers with only an RPC service - routerConf = new RouterConfigBuilder() - .rpc() - .build(); - - // Reduce the number of RPC clients threads to overload the Router easy - routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); - // We decrease the DN cache times to make the test faster - routerConf.setTimeDuration( - RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); - cluster.addRouterOverrides(routerConf); - // Start routers with only an RPC service - cluster.startRouters(); - - // Register and verify all NNs with all routers - cluster.registerNamenodes(); - cluster.waitNamenodeRegistration(); - cluster.waitActiveNamespaces(); - ns0 = cluster.getNameservices().get(0); - } - - @AfterClass - public static void shutdownCluster() throws Exception { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Before - public void setUp() throws IOException { - router = cluster.getRandomRouter(); - routerFs = router.getFileSystem(); - routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); - RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( - routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), - routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); - RouterRpcServer spy = Mockito.spy(routerRpcServer); - Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); - asyncCacheAdmin = new RouterAsyncCacheAdmin(spy); - - // Create mock locations - MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); - resolver.addLocation("/", ns0, "/"); - FSDataOutputStream fsDataOutputStream = routerFs.create( - new Path("/testCache.file"), true); - fsDataOutputStream.write(new byte[1024]); - fsDataOutputStream.close(); - } - - @After - public void tearDown() throws IOException { - // clear client context - CallerContext.setCurrent(null); - boolean delete = routerFs.delete(new Path("/testCache.file")); - assertTrue(delete); - if (routerFs != null) { - routerFs.close(); - } - } - - @Test - public void testRouterAsyncCacheAdmin() throws Exception { - asyncCacheAdmin.addCachePool(new CachePoolInfo("pool")); - syncReturn(null); - - CacheDirectiveInfo path = new CacheDirectiveInfo.Builder(). - setPool("pool"). - setPath(new Path("/testCache.file")). - build(); - asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE)); - long result = syncReturn(long.class); - assertEquals(1, result); - - asyncCacheAdmin.listCachePools(""); - BatchedEntries<CachePoolEntry> cachePoolEntries = syncReturn(BatchedEntries.class); - assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName()); - - CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder(). - setPool("pool"). - build(); - asyncCacheAdmin.listCacheDirectives(0, filter); - BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries = syncReturn(BatchedEntries.class); - assertEquals(new Path("/testCache.file"), cacheDirectiveEntries.get(0).getInfo().getPath()); - - CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user"); - asyncCacheAdmin.modifyCachePool(pool); - syncReturn(null); - - asyncCacheAdmin.listCachePools(""); - cachePoolEntries = syncReturn(BatchedEntries.class); - assertEquals("pool_user", cachePoolEntries.get(0).getInfo().getOwnerName()); - - path = new CacheDirectiveInfo.Builder(). - setPool("pool"). - setPath(new Path("/testCache.file")). - setReplication((short) 2). - setId(1L). - build(); - asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE)); - syncReturn(null); - - asyncCacheAdmin.listCacheDirectives(0, filter); - cacheDirectiveEntries = syncReturn(BatchedEntries.class); - assertEquals(Short.valueOf((short) 2), cacheDirectiveEntries.get(0).getInfo().getReplication()); - - asyncCacheAdmin.removeCacheDirective(1L); - syncReturn(null); - asyncCacheAdmin.removeCachePool("pool"); - syncReturn(null); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java deleted file mode 100644 index 72dc6815442..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; -import org.apache.hadoop.hdfs.server.federation.MockResolver; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.ipc.CallerContext; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; -import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * Used to test the async functionality of {@link RouterRpcServer}. - */ -public class TestRouterAsyncRpcServer { - private static Configuration routerConf; - /** Federated HDFS cluster. */ - private static MiniRouterDFSCluster cluster; - private static String ns0; - - /** Random Router for this federated cluster. */ - private MiniRouterDFSCluster.RouterContext router; - private FileSystem routerFs; - private RouterRpcServer asyncRouterRpcServer; - - @BeforeClass - public static void setUpCluster() throws Exception { - cluster = new MiniRouterDFSCluster(true, 1, 2, - DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); - cluster.setNumDatanodesPerNameservice(3); - - cluster.startCluster(); - - // Making one Namenode active per nameservice - if (cluster.isHighAvailability()) { - for (String ns : cluster.getNameservices()) { - cluster.switchToActive(ns, NAMENODES[0]); - cluster.switchToStandby(ns, NAMENODES[1]); - } - } - // Start routers with only an RPC service - routerConf = new RouterConfigBuilder() - .rpc() - .build(); - - // Reduce the number of RPC clients threads to overload the Router easy - routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); - // We decrease the DN cache times to make the test faster - routerConf.setTimeDuration( - RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); - cluster.addRouterOverrides(routerConf); - // Start routers with only an RPC service - cluster.startRouters(); - - // Register and verify all NNs with all routers - cluster.registerNamenodes(); - cluster.waitNamenodeRegistration(); - cluster.waitActiveNamespaces(); - ns0 = cluster.getNameservices().get(0); - } - - @AfterClass - public static void shutdownCluster() throws Exception { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Before - public void setUp() throws IOException { - router = cluster.getRandomRouter(); - routerFs = router.getFileSystem(); - RouterRpcServer routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); - RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( - routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), - routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); - asyncRouterRpcServer = Mockito.spy(routerRpcServer); - Mockito.when(asyncRouterRpcServer.getRPCClient()).thenReturn(asyncRpcClient); - - // Create mock locations - MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); - resolver.addLocation("/", ns0, "/"); - FsPermission permission = new FsPermission("705"); - routerFs.mkdirs(new Path("/testdir"), permission); - } - - @After - public void tearDown() throws IOException { - // clear client context - CallerContext.setCurrent(null); - boolean delete = routerFs.delete(new Path("/testdir")); - assertTrue(delete); - if (routerFs != null) { - routerFs.close(); - } - } - - /** - * Test that the async RPC server can invoke a method at an available Namenode. - */ - @Test - public void testInvokeAtAvailableNsAsync() throws Exception { - RemoteMethod method = new RemoteMethod("getStoragePolicies"); - asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class); - BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class); - assertEquals(8, storagePolicies.length); - } - - /** - * Test get create location async. - */ - @Test - public void testGetCreateLocationAsync() throws Exception { - final List<RemoteLocation> locations = - asyncRouterRpcServer.getLocationsForPath("/testdir", true); - asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations); - RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); - assertNotNull(remoteLocation); - assertEquals(ns0, remoteLocation.getNameserviceId()); - } - - /** - * Test get datanode report async. - */ - @Test - public void testGetDatanodeReportAsync() throws Exception { - asyncRouterRpcServer.getDatanodeReportAsync( - HdfsConstants.DatanodeReportType.ALL, true, 0); - DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); - assertEquals(3, datanodeInfos.length); - - // Get the namespace where the datanode is located - asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL); - Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class); - assertEquals(1, map.size()); - assertEquals(3, map.get(ns0).length); - - DatanodeInfo[] slowDatanodeReport1 = - asyncRouterRpcServer.getSlowDatanodeReport(true, 0); - - asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0); - DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class); - assertEquals(slowDatanodeReport1, slowDatanodeReport2); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java deleted file mode 100644 index 6671d2d1d8d..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; -import org.apache.hadoop.hdfs.server.federation.MockResolver; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; -import org.apache.hadoop.ipc.CallerContext; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; -import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -public class TestRouterAsyncStoragePolicy { - private static Configuration routerConf; - /** Federated HDFS cluster. */ - private static MiniRouterDFSCluster cluster; - private static String ns0; - - /** Random Router for this federated cluster. */ - private MiniRouterDFSCluster.RouterContext router; - private FileSystem routerFs; - private RouterRpcServer routerRpcServer; - private RouterAsyncStoragePolicy asyncStoragePolicy; - - private final String testfilePath = "/testdir/testAsyncStoragePolicy.file"; - - @BeforeClass - public static void setUpCluster() throws Exception { - cluster = new MiniRouterDFSCluster(true, 1, 2, - DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); - cluster.setNumDatanodesPerNameservice(3); - - cluster.startCluster(); - - // Making one Namenode active per nameservice - if (cluster.isHighAvailability()) { - for (String ns : cluster.getNameservices()) { - cluster.switchToActive(ns, NAMENODES[0]); - cluster.switchToStandby(ns, NAMENODES[1]); - } - } - // Start routers with only an RPC service - routerConf = new RouterConfigBuilder() - .rpc() - .build(); - - // Reduce the number of RPC clients threads to overload the Router easy - routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); - // We decrease the DN cache times to make the test faster - routerConf.setTimeDuration( - RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); - cluster.addRouterOverrides(routerConf); - // Start routers with only an RPC service - cluster.startRouters(); - - // Register and verify all NNs with all routers - cluster.registerNamenodes(); - cluster.waitNamenodeRegistration(); - cluster.waitActiveNamespaces(); - ns0 = cluster.getNameservices().get(0); - } - - @AfterClass - public static void shutdownCluster() throws Exception { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Before - public void setUp() throws IOException { - router = cluster.getRandomRouter(); - routerFs = router.getFileSystem(); - routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); - RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( - routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), - routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); - RouterRpcServer spy = Mockito.spy(routerRpcServer); - Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); - asyncStoragePolicy = new RouterAsyncStoragePolicy(spy); - - // Create mock locations - MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); - resolver.addLocation("/", ns0, "/"); - FsPermission permission = new FsPermission("705"); - routerFs.mkdirs(new Path("/testdir"), permission); - FSDataOutputStream fsDataOutputStream = routerFs.create( - new Path(testfilePath), true); - fsDataOutputStream.write(new byte[1024]); - fsDataOutputStream.close(); - } - - @After - public void tearDown() throws IOException { - // clear client context - CallerContext.setCurrent(null); - boolean delete = routerFs.delete(new Path("/testdir")); - assertTrue(delete); - if (routerFs != null) { - routerFs.close(); - } - } - - @Test - public void testRouterAsyncStoragePolicy() throws Exception { - BlockStoragePolicy[] storagePolicies = cluster.getNamenodes().get(0) - .getClient().getStoragePolicies(); - asyncStoragePolicy.getStoragePolicies(); - BlockStoragePolicy[] storagePoliciesAsync = syncReturn(BlockStoragePolicy[].class); - assertArrayEquals(storagePolicies, storagePoliciesAsync); - - asyncStoragePolicy.getStoragePolicy(testfilePath); - BlockStoragePolicy blockStoragePolicy1 = syncReturn(BlockStoragePolicy.class); - - asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD"); - syncReturn(null); - asyncStoragePolicy.getStoragePolicy(testfilePath); - BlockStoragePolicy blockStoragePolicy2 = syncReturn(BlockStoragePolicy.class); - assertNotEquals(blockStoragePolicy1, blockStoragePolicy2); - assertEquals("COLD", blockStoragePolicy2.getName()); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java index 86969f16953..cc25516d59f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.async; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -25,7 +25,6 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; -import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.ipc.CallerContext; import org.junit.After; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java new file mode 100644 index 00000000000..ef82ec267eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.EnumSet; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; + +/** + * Used to test the functionality of {@link RouterAsyncCacheAdmin}. + */ +public class TestRouterAsyncCacheAdmin extends RouterAsyncProtocolTestBase { + private RouterAsyncCacheAdmin asyncCacheAdmin; + + @Before + public void setup() throws IOException { + asyncCacheAdmin = new RouterAsyncCacheAdmin(getRouterAsyncRpcServer()); + FSDataOutputStream fsDataOutputStream = getRouterFs().create( + new Path("/testCache.file"), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @Test + public void testRouterAsyncCacheAdmin() throws Exception { + asyncCacheAdmin.addCachePool(new CachePoolInfo("pool")); + syncReturn(null); + + CacheDirectiveInfo path = new CacheDirectiveInfo.Builder(). + setPool("pool"). + setPath(new Path("/testCache.file")). + build(); + asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE)); + long result = syncReturn(long.class); + assertEquals(1, result); + + asyncCacheAdmin.listCachePools(""); + BatchedEntries<CachePoolEntry> cachePoolEntries = syncReturn(BatchedEntries.class); + assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName()); + + CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder(). + setPool("pool"). + build(); + asyncCacheAdmin.listCacheDirectives(0, filter); + BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries = syncReturn(BatchedEntries.class); + assertEquals(new Path("/testCache.file"), cacheDirectiveEntries.get(0).getInfo().getPath()); + + CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user"); + asyncCacheAdmin.modifyCachePool(pool); + syncReturn(null); + + asyncCacheAdmin.listCachePools(""); + cachePoolEntries = syncReturn(BatchedEntries.class); + assertEquals("pool_user", cachePoolEntries.get(0).getInfo().getOwnerName()); + + path = new CacheDirectiveInfo.Builder(). + setPool("pool"). + setPath(new Path("/testCache.file")). + setReplication((short) 2). + setId(1L). + build(); + asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE)); + syncReturn(null); + + asyncCacheAdmin.listCacheDirectives(0, filter); + cacheDirectiveEntries = syncReturn(BatchedEntries.class); + assertEquals(Short.valueOf((short) 2), cacheDirectiveEntries.get(0).getInfo().getReplication()); + + asyncCacheAdmin.removeCacheDirective(1L); + syncReturn(null); + asyncCacheAdmin.removeCachePool("pool"); + syncReturn(null); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java index 047cf6bdb55..86ba2b2aed8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.conf.Configuration; @@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.CallerContext; import org.junit.After; @@ -49,7 +51,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java index 86081260536..1814031cfb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.async; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.junit.Before; import org.junit.Test; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java index 0b1eeeec0be..ecbf916aaff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.ipc.CallerContext; import org.junit.After; import org.junit.AfterClass; @@ -44,7 +46,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertTrue; public class TestRouterAsyncQuota { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java index e3429d493dc..e0ce4746cda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -30,6 +30,11 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RetriableException; @@ -54,7 +59,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java new file mode 100644 index 00000000000..c022789a2eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Used to test the async functionality of {@link RouterRpcServer}. + */ +public class TestRouterAsyncRpcServer extends RouterAsyncProtocolTestBase { + private RouterRpcServer asyncRouterRpcServer; + + @Before + public void setup() throws IOException { + asyncRouterRpcServer = getRouterAsyncRpcServer(); + } + + /** + * Test that the async RPC server can invoke a method at an available Namenode. + */ + @Test + public void testInvokeAtAvailableNsAsync() throws Exception { + RemoteMethod method = new RemoteMethod("getStoragePolicies"); + asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class); + BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class); + assertEquals(8, storagePolicies.length); + } + + /** + * Test get create location async. + */ + @Test + public void testGetCreateLocationAsync() throws Exception { + final List<RemoteLocation> locations = + asyncRouterRpcServer.getLocationsForPath("/testdir", true); + asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations); + RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); + assertNotNull(remoteLocation); + assertEquals(getNs0(), remoteLocation.getNameserviceId()); + } + + /** + * Test get datanode report async. + */ + @Test + public void testGetDatanodeReportAsync() throws Exception { + asyncRouterRpcServer.getDatanodeReportAsync( + HdfsConstants.DatanodeReportType.ALL, true, 0); + DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); + assertEquals(3, datanodeInfos.length); + + // Get the namespace where the datanode is located + asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL); + Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class); + assertEquals(1, map.size()); + assertEquals(3, map.get(getNs0()).length); + + DatanodeInfo[] slowDatanodeReport1 = + asyncRouterRpcServer.getSlowDatanodeReport(true, 0); + + asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0); + DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class); + assertEquals(slowDatanodeReport1, slowDatanodeReport2); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java similarity index 50% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java index 49a682cea4b..a44664ec23d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java @@ -15,135 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router; +package org.apache.hadoop.hdfs.server.federation.router.async; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshotStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; -import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; -import org.apache.hadoop.hdfs.server.federation.MockResolver; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; -import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.test.LambdaTestUtils; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; - import java.io.IOException; -import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY; -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; -import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestRouterAsyncSnapshot { - private static Configuration routerConf; - /** Federated HDFS cluster. */ - private static MiniRouterDFSCluster cluster; - private static String ns0; - /** Random Router for this federated cluster. */ - private MiniRouterDFSCluster.RouterContext router; +/** + * Used to test the functionality of {@link RouterAsyncSnapshot}. + */ +public class TestRouterAsyncSnapshot extends RouterAsyncProtocolTestBase { + private final String testFile = "/testdir/testSnapshot.file"; private FileSystem routerFs; - private RouterRpcServer routerRpcServer; private RouterAsyncSnapshot asyncSnapshot; - @BeforeClass - public static void setUpCluster() throws Exception { - cluster = new MiniRouterDFSCluster(true, 1, 2, - DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); - cluster.setNumDatanodesPerNameservice(3); - - cluster.startCluster(); - - // Making one Namenode active per nameservice - if (cluster.isHighAvailability()) { - for (String ns : cluster.getNameservices()) { - cluster.switchToActive(ns, NAMENODES[0]); - cluster.switchToStandby(ns, NAMENODES[1]); - } - } - // Start routers with only an RPC service - routerConf = new RouterConfigBuilder() - .rpc() - .build(); - - // Reduce the number of RPC clients threads to overload the Router easy - routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); - // We decrease the DN cache times to make the test faster - routerConf.setTimeDuration( - RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); - cluster.addRouterOverrides(routerConf); - // Start routers with only an RPC service - cluster.startRouters(); - - // Register and verify all NNs with all routers - cluster.registerNamenodes(); - cluster.waitNamenodeRegistration(); - cluster.waitActiveNamespaces(); - ns0 = cluster.getNameservices().get(0); - } - - @AfterClass - public static void shutdownCluster() throws Exception { - if (cluster != null) { - cluster.shutdown(); - } - } - @Before - public void setUp() throws IOException { - router = cluster.getRandomRouter(); - routerFs = router.getFileSystem(); - routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); - RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( - routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), - routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); - RouterRpcServer spy = Mockito.spy(routerRpcServer); - Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); - asyncSnapshot = new RouterAsyncSnapshot(spy); - - // Create mock locations - MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); - resolver.addLocation("/", ns0, "/"); - FsPermission permission = new FsPermission("705"); - routerFs.mkdirs(new Path("/testdir"), permission); + public void setup() throws IOException { + routerFs = getRouterFs(); + asyncSnapshot = new RouterAsyncSnapshot(getRouterAsyncRpcServer()); FSDataOutputStream fsDataOutputStream = routerFs.create( - new Path("/testdir/testSnapshot.file"), true); + new Path(testFile), true); fsDataOutputStream.write(new byte[1024]); fsDataOutputStream.close(); } - @After - public void tearDown() throws IOException { - // clear client context - CallerContext.setCurrent(null); - boolean delete = routerFs.delete(new Path("/testdir")); - assertTrue(delete); - if (routerFs != null) { - routerFs.close(); - } - } - @Test public void testRouterAsyncSnapshot() throws Exception { asyncSnapshot.allowSnapshot("/testdir"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java new file mode 100644 index 00000000000..d2afe9ad4af --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; + +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Used to test the functionality of {@link RouterAsyncStoragePolicy}. + */ +public class TestRouterAsyncStoragePolicy extends RouterAsyncProtocolTestBase { + private final String testfilePath = "/testdir/testAsyncStoragePolicy.file"; + private RouterAsyncStoragePolicy asyncStoragePolicy; + + @Before + public void setup() throws IOException { + asyncStoragePolicy = new RouterAsyncStoragePolicy(getRouterAsyncRpcServer()); + FSDataOutputStream fsDataOutputStream = getRouterFs().create( + new Path(testfilePath), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @Test + public void testRouterAsyncStoragePolicy() throws Exception { + BlockStoragePolicy[] storagePolicies = getCluster().getNamenodes().get(0) + .getClient().getStoragePolicies(); + asyncStoragePolicy.getStoragePolicies(); + BlockStoragePolicy[] storagePoliciesAsync = syncReturn(BlockStoragePolicy[].class); + assertArrayEquals(storagePolicies, storagePoliciesAsync); + + asyncStoragePolicy.getStoragePolicy(testfilePath); + BlockStoragePolicy blockStoragePolicy1 = syncReturn(BlockStoragePolicy.class); + + asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD"); + syncReturn(null); + asyncStoragePolicy.getStoragePolicy(testfilePath); + BlockStoragePolicy blockStoragePolicy2 = syncReturn(BlockStoragePolicy.class); + assertNotEquals(blockStoragePolicy1, blockStoragePolicy2); + assertEquals("COLD", blockStoragePolicy2.getName()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java similarity index 94% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java index a3fcd6109e5..ce76be9ed7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.async; +package org.apache.hadoop.hdfs.server.federation.router.async; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.Test; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java index e5bf7ce08e8..bfc172edc02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,15 +28,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.Function; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCurrent; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCurrent; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; /** * AsyncClass demonstrates the conversion of synchronous methods diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java index 8d5b5b1dc82..084806d65c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.List; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java index e55edb098e1..805b955661d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import java.io.IOException; import java.util.ArrayList; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java similarity index 99% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java index c540af612b9..644f639ac9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.federation.router.async; +package org.apache.hadoop.hdfs.server.federation.router.async.utils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java index 9b2b5a06588..b73764cbe18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java @@ -132,9 +132,7 @@ public class TestRouterSecurityManager { // Cancel the delegation token securityManager.cancelDelegationToken(token); - String exceptionCause = "Renewal request for unknown token"; exceptionRule.expect(SecretManager.InvalidToken.class); - exceptionRule.expectMessage(exceptionCause); // This throws an exception as token has been cancelled. securityManager.renewDelegationToken(token); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org