This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 011e8af9dae067deac1658e2aaae5e683447281f Author: WangQixiang <72643258+archie-w...@users.noreply.github.com> AuthorDate: Tue Oct 15 21:54:25 2024 +0800 HDFS-17594. [ARR] RouterCacheAdmin supports asynchronous rpc. (#6986). Contributed by Archie73. Reviewed-by: Jian Zhang <keeprom...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../federation/router/RouterAsyncCacheAdmin.java | 75 ++++++++ .../server/federation/router/RouterCacheAdmin.java | 36 ++-- .../router/TestRouterAsyncCacheAdmin.java | 195 +++++++++++++++++++++ 3 files changed, 294 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java new file mode 100644 index 00000000000..fca43e15879 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; + +/** + * Module that implements all the asynchronous RPC calls in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to Cache Admin + * in the {@link RouterRpcServer}. + */ +public class RouterAsyncCacheAdmin extends RouterCacheAdmin { + + public RouterAsyncCacheAdmin(RouterRpcServer server) { + super(server); + } + + @Override + public long addCacheDirective( + CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { + invokeAddCacheDirective(path, flags); + asyncApply((ApplyFunction<Map<RemoteLocation, Long>, Long>) + response -> response.values().iterator().next()); + return asyncReturn(Long.class); + } + + @Override + public BatchedEntries<CacheDirectiveEntry> listCacheDirectives( + long prevId, CacheDirectiveInfo filter) throws IOException { + invokeListCacheDirectives(prevId, filter); + asyncApply((ApplyFunction<Map, + BatchedEntries<CacheDirectiveEntry>>) + response -> (BatchedEntries<CacheDirectiveEntry>) response.values().iterator().next()); + return asyncReturn(BatchedEntries.class); + } + + @Override + public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException { + invokeListCachePools(prevKey); + asyncApply((ApplyFunction<Map<FederationNamespaceInfo, BatchedEntries>, + BatchedEntries<CachePoolEntry>>) + results -> results.values().iterator().next()); + return asyncReturn(BatchedEntries.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java index e25d8b269df..f2dd5dc1827 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java @@ -59,15 +59,19 @@ public RouterCacheAdmin(RouterRpcServer server) { public long addCacheDirective(CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { + Map<RemoteLocation, Long> response = invokeAddCacheDirective(path, flags); + return response.values().iterator().next(); + } + + protected Map<RemoteLocation, Long> invokeAddCacheDirective( + CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path.getPath().toString(), true, false); RemoteMethod method = new RemoteMethod("addCacheDirective", new Class<?>[] {CacheDirectiveInfo.class, EnumSet.class}, new RemoteParam(getRemoteMap(path, locations)), flags); - Map<RemoteLocation, Long> response = - rpcClient.invokeConcurrent(locations, method, false, false, long.class); - return response.values().iterator().next(); + return rpcClient.invokeConcurrent(locations, method, false, false, long.class); } public void modifyCacheDirective(CacheDirectiveInfo directive, @@ -100,6 +104,12 @@ public void removeCacheDirective(long id) throws IOException { public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId, CacheDirectiveInfo filter) throws IOException { + Map results = invokeListCacheDirectives(prevId, filter); + return (BatchedEntries<CacheDirectiveEntry>) results.values().iterator().next(); + } + + protected Map invokeListCacheDirectives(long prevId, + CacheDirectiveInfo filter) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ, true); if (filter.getPath() != null) { final List<RemoteLocation> locations = rpcServer @@ -107,17 +117,15 @@ public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId, RemoteMethod method = new RemoteMethod("listCacheDirectives", new Class<?>[] {long.class, CacheDirectiveInfo.class}, prevId, new RemoteParam(getRemoteMap(filter, locations))); - Map<RemoteLocation, BatchedEntries> response = rpcClient.invokeConcurrent( + return rpcClient.invokeConcurrent( locations, method, false, false, BatchedEntries.class); - return response.values().iterator().next(); } RemoteMethod method = new RemoteMethod("listCacheDirectives", new Class<?>[] {long.class, CacheDirectiveInfo.class}, prevId, filter); Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - Map<FederationNamespaceInfo, BatchedEntries> results = rpcClient - .invokeConcurrent(nss, method, true, false, BatchedEntries.class); - return results.values().iterator().next(); + return rpcClient.invokeConcurrent( + nss, method, true, false, BatchedEntries.class); } public void addCachePool(CachePoolInfo info) throws IOException { @@ -146,13 +154,17 @@ public void removeCachePool(String cachePoolName) throws IOException { public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException { + Map<FederationNamespaceInfo, BatchedEntries> results = invokeListCachePools(prevKey); + return results.values().iterator().next(); + } + + protected Map<FederationNamespaceInfo, BatchedEntries> invokeListCachePools( + String prevKey) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ, true); RemoteMethod method = new RemoteMethod("listCachePools", new Class<?>[] {String.class}, prevKey); Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - Map<FederationNamespaceInfo, BatchedEntries> results = rpcClient - .invokeConcurrent(nss, method, true, false, BatchedEntries.class); - return results.values().iterator().next(); + return rpcClient.invokeConcurrent(nss, method, true, false, BatchedEntries.class); } /** @@ -161,7 +173,7 @@ public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) * @param locations the locations to map. * @return map with CacheDirectiveInfo mapped to the locations. */ - private Map<RemoteLocation, CacheDirectiveInfo> getRemoteMap( + protected Map<RemoteLocation, CacheDirectiveInfo> getRemoteMap( CacheDirectiveInfo path, final List<RemoteLocation> locations) { final Map<RemoteLocation, CacheDirectiveInfo> dstMap = new HashMap<>(); Iterator<RemoteLocation> iterator = locations.iterator(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java new file mode 100644 index 00000000000..4ad5ccbfee6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.ipc.CallerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRouterAsyncCacheAdmin { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer routerRpcServer; + private RouterAsyncCacheAdmin asyncCacheAdmin; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + RouterRpcServer spy = Mockito.spy(routerRpcServer); + Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); + asyncCacheAdmin = new RouterAsyncCacheAdmin(spy); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FSDataOutputStream fsDataOutputStream = routerFs.create( + new Path("/testCache.file"), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testCache.file")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + @Test + public void testRouterAsyncCacheAdmin() throws Exception { + asyncCacheAdmin.addCachePool(new CachePoolInfo("pool")); + syncReturn(null); + + CacheDirectiveInfo path = new CacheDirectiveInfo.Builder(). + setPool("pool"). + setPath(new Path("/testCache.file")). + build(); + asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE)); + long result = syncReturn(long.class); + assertEquals(1, result); + + asyncCacheAdmin.listCachePools(""); + BatchedEntries<CachePoolEntry> cachePoolEntries = syncReturn(BatchedEntries.class); + assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName()); + + CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder(). + setPool("pool"). + build(); + asyncCacheAdmin.listCacheDirectives(0, filter); + BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries = syncReturn(BatchedEntries.class); + assertEquals(new Path("/testCache.file"), cacheDirectiveEntries.get(0).getInfo().getPath()); + + CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user"); + asyncCacheAdmin.modifyCachePool(pool); + syncReturn(null); + + asyncCacheAdmin.listCachePools(""); + cachePoolEntries = syncReturn(BatchedEntries.class); + assertEquals("pool_user", cachePoolEntries.get(0).getInfo().getOwnerName()); + + path = new CacheDirectiveInfo.Builder(). + setPool("pool"). + setPath(new Path("/testCache.file")). + setReplication((short) 2). + setId(1L). + build(); + asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE)); + syncReturn(null); + + asyncCacheAdmin.listCacheDirectives(0, filter); + cacheDirectiveEntries = syncReturn(BatchedEntries.class); + assertEquals(Short.valueOf((short) 2), cacheDirectiveEntries.get(0).getInfo().getReplication()); + + asyncCacheAdmin.removeCacheDirective(1L); + syncReturn(null); + asyncCacheAdmin.removeCachePool("pool"); + syncReturn(null); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org