This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 060f6b866c348d62a68e68e1ce5af7e524944e50 Author: Jian Zhang <keeprom...@apache.org> AuthorDate: Mon Nov 25 14:35:41 2024 +0800 HDFS-17656. [ARR] RouterNamenodeProtocol and RouterUserProtocol supports asynchronous rpc. (#7159). Contributed by Jian Zhang. Reviewed-by: Jian Zhang <keeprom...@apache.org> Signed-off-by: Jian Zhang <keeprom...@apache.org> --- .../async/RouterAsyncNamenodeProtocol.java | 198 +++++++++++++++++++++ .../federation/async/RouterAsyncUserProtocol.java | 129 ++++++++++++++ .../hdfs/server/federation/async/package-info.java | 31 ++++ .../federation/router/RouterAdminServer.java | 6 +- .../server/federation/router/RouterRpcServer.java | 24 +-- .../async/RouterAsyncProtocolTestBase.java | 164 +++++++++++++++++ .../async/TestRouterAsyncNamenodeProtocol.java | 126 +++++++++++++ .../async/TestRouterAsyncUserProtocol.java | 48 +++++ 8 files changed, 713 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java new file mode 100644 index 00000000000..fe05a57b854 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.async; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import java.io.IOException; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; + +/** + * Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterAsyncNamenodeProtocol extends RouterNamenodeProtocol { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + + public RouterAsyncNamenodeProtocol(RouterRpcServer server) { + super(server); + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + } + + /** + * Asynchronously get a list of blocks belonging to <code>datanode</code> + * whose total size equals <code>size</code>. + * + * @see org.apache.hadoop.hdfs.server.balancer.Balancer + * @param datanode a data node + * @param size requested size + * @param minBlockSize each block should be of this minimum Block Size + * @param hotBlockTimeInterval prefer to get blocks which are belong to + * the cold files accessed before the time interval + * @param storageType the given storage type {@link StorageType} + * @return BlocksWithLocations a list of blocks & their locations + * @throws IOException if size is less than or equal to 0 or + datanode does not exist + */ + @Override + public BlocksWithLocations getBlocks( + DatanodeInfo datanode, long size, + long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // Get the namespace where the datanode is located + rpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL); + asyncApply((AsyncApplyFunction<Map<String, DatanodeStorageReport[]>, Object>) map -> { + String nsId = null; + for (Map.Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) { + DatanodeStorageReport[] dns = entry.getValue(); + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) { + nsId = entry.getKey(); + break; + } + } + // Break the loop if already found + if (nsId != null) { + break; + } + } + // Forward to the proper namenode + if (nsId != null) { + RemoteMethod method = new RemoteMethod( + NamenodeProtocol.class, "getBlocks", new Class<?>[] + {DatanodeInfo.class, long.class, long.class, long.class, StorageType.class}, + datanode, size, minBlockSize, hotBlockTimeInterval, storageType); + rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class); + } else { + asyncComplete(null); + } + }); + return asyncReturn(BlocksWithLocations.class); + } + + /** + * Asynchronously get the current block keys. + * + * @return ExportedBlockKeys containing current block keys + * @throws IOException if there is no namespace available or other ioExceptions. + */ + @Override + public ExportedBlockKeys getBlockKeys() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getBlockKeys"); + rpcServer.invokeAtAvailableNsAsync(method, ExportedBlockKeys.class); + return asyncReturn(ExportedBlockKeys.class); + } + + /** + * Asynchronously get the most recent transaction ID. + * + * @return The most recent transaction ID that has been synced to + * persistent storage, or applied from persistent storage in the + * case of a non-active node. + * @throws IOException if there is no namespace available or other ioExceptions. + */ + @Override + public long getTransactionID() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getTransactionID"); + rpcServer.invokeAtAvailableNsAsync(method, long.class); + return asyncReturn(Long.class); + } + + /** + * Asynchronously get the transaction ID of the most recent checkpoint. + * + * @return The transaction ID of the most recent checkpoint. + * @throws IOException if there is no namespace available or other ioExceptions. + */ + @Override + public long getMostRecentCheckpointTxId() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId"); + rpcServer.invokeAtAvailableNsAsync(method, long.class); + return asyncReturn(Long.class); + } + + /** + * Asynchronously get the transaction ID of the most recent checkpoint + * for the given NameNodeFile. + * + * @return The transaction ID of the most recent checkpoint + * for the given NameNodeFile. + * @throws IOException if there is no namespace available or other ioExceptions. + */ + @Override + public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId", + new Class<?>[] {NNStorage.NameNodeFile.class}, nnf); + rpcServer.invokeAtAvailableNsAsync(method, long.class); + return asyncReturn(Long.class); + } + + /** + * Asynchronously request name-node version and storage information. + * + * @return {@link NamespaceInfo} identifying versions and storage information + * of the name-node. + * @throws IOException if there is no namespace available or other ioExceptions. + */ + @Override + public NamespaceInfo versionRequest() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "versionRequest"); + rpcServer.invokeAtAvailableNsAsync(method, NamespaceInfo.class); + return asyncReturn(NamespaceInfo.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java new file mode 100644 index 00000000000..3e03a9bdd5c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.async; + +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.RouterUserProtocol; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; + +/** + * Module that implements all the asynchronous RPC calls in + * {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterAsyncUserProtocol extends RouterUserProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncUserProtocol.class); + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + + private final ActiveNamenodeResolver namenodeResolver; + + public RouterAsyncUserProtocol(RouterRpcServer server) { + super(server); + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = this.rpcServer.getNamenodeResolver(); + } + + /** + * Asynchronously refresh user to group mappings. + * @throws IOException raised on errors performing I/O. + */ + @Override + public void refreshUserToGroupsMappings() throws IOException { + LOG.debug("Refresh user groups mapping in Router."); + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + Groups.getUserToGroupsMappingService().refresh(); + asyncComplete(null); + } else { + RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class, + "refreshUserToGroupsMappings"); + rpcClient.invokeConcurrent(nss, method); + } + } + + /** + * Asynchronously refresh superuser proxy group list. + * @throws IOException raised on errors performing I/O. + */ + @Override + public void refreshSuperUserGroupsConfiguration() throws IOException { + LOG.debug("Refresh superuser groups configuration in Router."); + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + ProxyUsers.refreshSuperUserGroupsConfiguration(); + asyncComplete(null); + } else { + RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class, + "refreshSuperUserGroupsConfiguration"); + rpcClient.invokeConcurrent(nss, method); + } + } + + /** + * Asynchronously get the groups which are mapped to the given user. + * @param user The user to get the groups for. + * @return The set of groups the user belongs to. + * @throws IOException raised on errors performing I/O. + */ + @Override + public String[] getGroupsForUser(String user) throws IOException { + LOG.debug("Getting groups for user {}", user); + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + asyncComplete(UserGroupInformation.createRemoteUser(user) + .getGroupNames()); + } else { + RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class, + "getGroupsForUser", new Class<?>[] {String.class}, user); + rpcClient.invokeConcurrent(nss, method, String[].class); + asyncApply((ApplyFunction<Map<FederationNamespaceInfo, String[]>, String[]>) + results -> merge(results, String.class)); + } + return asyncReturn(String[].class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java new file mode 100644 index 00000000000..36e0513bb6a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes that facilitate asynchronous operations within the Hadoop + * Distributed File System (HDFS) Federation router. These classes are designed to work with + * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that + * can improve the performance and responsiveness of HDFS operations. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.async; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index 2d96ab1be35..a462b7a5f73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; import java.io.IOException; import java.net.InetSocketAddress; @@ -627,12 +628,15 @@ private List<String> getDestinationNameServices( Map<RemoteLocation, HdfsFileStatus> responses = rpcClient.invokeConcurrent( locations, method, false, false, HdfsFileStatus.class); + if (rpcServer.isAsync()) { + responses = syncReturn(Map.class); + } for (RemoteLocation location : locations) { if (responses.get(location) != null) { nsIds.add(location.getNameserviceId()); } } - } catch (IOException ioe) { + } catch (Exception ioe) { LOG.error("Cannot get location for {}: {}", src, ioe.getMessage()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c2c86d9015a..6fb189b0e01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -72,7 +72,6 @@ import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; @@ -398,8 +397,7 @@ public RouterRpcServer(Configuration conf, Router router, NotReplicatedYetException.class, IOException.class, ConnectException.class, - RetriableException.class, - PathIsNotEmptyDirectoryException.class); + RetriableException.class); this.rpcServer.addSuppressedLoggingExceptions( StandbyException.class, UnresolvedPathException.class); @@ -464,7 +462,7 @@ public RouterRpcServer(Configuration conf, Router router, /** * Init router async handlers and router async responders. */ - protected void initAsyncThreadPool() { + public void initAsyncThreadPool() { int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, @@ -607,7 +605,7 @@ BalanceProcedureScheduler getFedRenameScheduler() { * @return routerStateIdContext */ @VisibleForTesting - protected RouterStateIdContext getRouterStateIdContext() { + public RouterStateIdContext getRouterStateIdContext() { return routerStateIdContext; } @@ -710,7 +708,7 @@ void checkOperation(OperationCategory op, boolean supported) * @throws StandbyException If the Router is in safe mode and cannot serve * client requests. */ - void checkOperation(OperationCategory op) + public void checkOperation(OperationCategory op) throws StandbyException { // Log the function we are currently calling. if (rpcMonitor != null) { @@ -776,8 +774,9 @@ static String getMethodName() { * If the namespace is unavailable, retry with other namespaces. * @param <T> expected return type. * @param method the remote method. + * @param clazz the type of return value. * @return the response received after invoking method. - * @throws IOException + * @throws IOException if there is no namespace available or other ioExceptions. */ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz) throws IOException { @@ -810,10 +809,11 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz) * Asynchronous version of invokeAtAvailableNs method. * @param <T> expected return type. * @param method the remote method. + * @param clazz the type of return value. * @return the response received after invoking method. - * @throws IOException + * @throws IOException if there is no namespace available or other ioExceptions. */ - <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz) + public <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz) throws IOException { String nsId = subclusterResolver.getDefaultNamespace(); // If default Ns is not present return result from first namespace. @@ -851,7 +851,7 @@ <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz) * @param ioe IOException . * @param nss List of name spaces in the federation * @return the response received after invoking method. - * @throws IOException + * @throws IOException if there is no namespace available or other ioExceptions. */ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe, Set<FederationNamespaceInfo> nss) throws IOException { @@ -885,7 +885,7 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe, * @param ioe IOException . * @param nss List of name spaces in the federation * @return the response received after invoking method. - * @throws IOException + * @throws IOException if there is no namespace available or other ioExceptions. */ <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe, Set<FederationNamespaceInfo> nss) throws IOException { @@ -2131,7 +2131,7 @@ static void resetCurrentUser() { * @param clazz Class of the values. * @return Array with the outputs. */ - static <T> T[] merge( + public static <T> T[] merge( Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) { // Put all results into a set to avoid repeats diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java new file mode 100644 index 00000000000..86969f16953 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.ipc.CallerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.junit.Assert.assertTrue; + +/** + * Used to test the functionality of async router rps. + */ +public class RouterAsyncProtocolTestBase { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer routerRpcServer; + private RouterRpcServer routerAsyncRpcServer; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + routerAsyncRpcServer = Mockito.spy(routerRpcServer); + Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FsPermission permission = new FsPermission("705"); + routerFs.mkdirs(new Path("/testdir"), permission); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testdir")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + public static Configuration getRouterConf() { + return routerConf; + } + + public static MiniRouterDFSCluster getCluster() { + return cluster; + } + + public static String getNs0() { + return ns0; + } + + public MiniRouterDFSCluster.RouterContext getRouter() { + return router; + } + + public FileSystem getRouterFs() { + return routerFs; + } + + public RouterRpcServer getRouterRpcServer() { + return routerRpcServer; + } + + public RouterRpcServer getRouterAsyncRpcServer() { + return routerAsyncRpcServer; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java new file mode 100644 index 00000000000..86081260536 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.async; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Used to test the functionality of {@link RouterAsyncNamenodeProtocol}. + */ +public class TestRouterAsyncNamenodeProtocol extends RouterAsyncProtocolTestBase { + + private RouterAsyncNamenodeProtocol asyncNamenodeProtocol; + private RouterNamenodeProtocol namenodeProtocol; + + @Before + public void setup() throws Exception { + asyncNamenodeProtocol = new RouterAsyncNamenodeProtocol(getRouterAsyncRpcServer()); + namenodeProtocol = new RouterNamenodeProtocol(getRouterRpcServer()); + } + + @Test + public void getBlocks() throws Exception { + DatanodeInfo[] dns = getRouter().getClient() + .getNamenode().getDatanodeReport(HdfsConstants.DatanodeReportType.ALL); + + DatanodeInfo dn0 = dns[0]; + asyncNamenodeProtocol.getBlocks(dn0, 1024, 0, 0, + null); + BlocksWithLocations asyncRouterBlockLocations = syncReturn(BlocksWithLocations.class); + assertNotNull(asyncRouterBlockLocations); + + BlocksWithLocations syncRouterBlockLocations = namenodeProtocol.getBlocks(dn0, 1024, + 0, 0, null); + + BlockWithLocations[] asyncRouterBlocks = asyncRouterBlockLocations.getBlocks(); + BlockWithLocations[] syncRouterBlocks = syncRouterBlockLocations.getBlocks(); + + assertEquals(asyncRouterBlocks.length, syncRouterBlocks.length); + for (int i = 0; i < syncRouterBlocks.length; i++) { + assertEquals( + asyncRouterBlocks[i].getBlock().getBlockId(), + syncRouterBlocks[i].getBlock().getBlockId()); + } + } + + @Test + public void getBlockKeys() throws Exception { + asyncNamenodeProtocol.getBlockKeys(); + ExportedBlockKeys asyncBlockKeys = syncReturn(ExportedBlockKeys.class); + assertNotNull(asyncBlockKeys); + + ExportedBlockKeys syncBlockKeys = namenodeProtocol.getBlockKeys(); + compareBlockKeys(asyncBlockKeys, syncBlockKeys); + } + + @Test + public void getTransactionID() throws Exception { + asyncNamenodeProtocol.getTransactionID(); + long asyncTransactionID = syncReturn(Long.class); + assertNotNull(asyncTransactionID); + + long transactionID = namenodeProtocol.getTransactionID(); + assertEquals(asyncTransactionID, transactionID); + } + + @Test + public void getMostRecentCheckpointTxId() throws Exception { + asyncNamenodeProtocol.getMostRecentCheckpointTxId(); + long asyncMostRecentCheckpointTxId = syncReturn(Long.class); + assertNotNull(asyncMostRecentCheckpointTxId); + + long mostRecentCheckpointTxId = namenodeProtocol.getMostRecentCheckpointTxId(); + assertEquals(asyncMostRecentCheckpointTxId, mostRecentCheckpointTxId); + } + + @Test + public void versionRequest() throws Exception { + asyncNamenodeProtocol.versionRequest(); + NamespaceInfo asyncNamespaceInfo = syncReturn(NamespaceInfo.class); + assertNotNull(asyncNamespaceInfo); + NamespaceInfo syncNamespaceInfo = namenodeProtocol.versionRequest(); + compareVersion(asyncNamespaceInfo, syncNamespaceInfo); + } + + private void compareBlockKeys( + ExportedBlockKeys blockKeys, ExportedBlockKeys otherBlockKeys) { + assertEquals(blockKeys.getCurrentKey(), otherBlockKeys.getCurrentKey()); + assertEquals(blockKeys.getKeyUpdateInterval(), otherBlockKeys.getKeyUpdateInterval()); + assertEquals(blockKeys.getTokenLifetime(), otherBlockKeys.getTokenLifetime()); + } + + private void compareVersion(NamespaceInfo version, NamespaceInfo otherVersion) { + assertEquals(version.getBlockPoolID(), otherVersion.getBlockPoolID()); + assertEquals(version.getNamespaceID(), otherVersion.getNamespaceID()); + assertEquals(version.getClusterID(), otherVersion.getClusterID()); + assertEquals(version.getLayoutVersion(), otherVersion.getLayoutVersion()); + assertEquals(version.getCTime(), otherVersion.getCTime()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java new file mode 100644 index 00000000000..a3fcd6109e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.async; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertArrayEquals; + +/** + * Used to test the functionality of {@link RouterAsyncUserProtocol}. + */ +public class TestRouterAsyncUserProtocol extends RouterAsyncProtocolTestBase { + + private RouterAsyncUserProtocol asyncUserProtocol; + + @Before + public void setup() throws Exception { + asyncUserProtocol = new RouterAsyncUserProtocol(getRouterAsyncRpcServer()); + } + + @Test + public void testgetGroupsForUser() throws Exception { + String[] group = new String[] {"bar", "group2"}; + UserGroupInformation.createUserForTesting("user", + new String[] {"bar", "group2"}); + asyncUserProtocol.getGroupsForUser("user"); + String[] result = syncReturn(String[].class); + assertArrayEquals(group, result); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org