This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 198288c5c8f31755dc591f2d83916e298b2f12a6 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Mon Nov 25 14:41:24 2024 +0800 HDFS-17659. [ARR]Router Quota supports asynchronous rpc. (#7157). Contributed by hfutatzhanghb. Reviewed-by: Jian Zhang <keeprom...@apache.org> --- .../hdfs/server/federation/router/AsyncQuota.java | 87 +++++++++++ .../hdfs/server/federation/router/Quota.java | 6 +- .../router/RouterQuotaUpdateService.java | 10 ++ .../federation/router/TestRouterAsyncQuota.java | 166 +++++++++++++++++++++ 4 files changed, 266 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java new file mode 100644 index 00000000000..5d76171a548 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionException; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; + +public class AsyncQuota extends Quota { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + private final Router router; + + public AsyncQuota(Router router, RouterRpcServer server) { + super(router, server); + this.router = router; + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + } + + /** + * Async get aggregated quota usage for the federation path. + * @param path Federation path. + * @return Aggregated quota. + * @throws IOException If the quota system is disabled. + */ + public QuotaUsage getQuotaUsage(String path) throws IOException { + getEachQuotaUsage(path); + + asyncApply(o -> { + Map<RemoteLocation, QuotaUsage> results = (Map<RemoteLocation, QuotaUsage>) o; + try { + return aggregateQuota(path, results); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + return asyncReturn(QuotaUsage.class); + } + + /** + * Get quota usage for the federation path. + * @param path Federation path. + * @return quota usage for each remote location. + * @throws IOException If the quota system is disabled. + */ + Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + if (!router.isQuotaEnabled()) { + throw new IOException("The quota system is disabled in Router."); + } + + final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path); + RemoteMethod method = new RemoteMethod("getQuotaUsage", + new Class<?>[] {String.class}, new RemoteParam()); + rpcClient.invokeConcurrent( + quotaLocs, method, true, false, QuotaUsage.class); + return asyncReturn(Map.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java index e19e51b5733..f28af6afa7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -213,9 +213,9 @@ private boolean isMountEntry(String path) { * method will do some additional filtering. * @param path Federation path. * @return List of valid quota remote locations. - * @throws IOException + * @throws IOException If the location for this path cannot be determined. */ - private List<RemoteLocation> getValidQuotaLocations(String path) + protected List<RemoteLocation> getValidQuotaLocations(String path) throws IOException { final List<RemoteLocation> locations = getQuotaRemoteLocations(path); @@ -359,7 +359,7 @@ public static boolean andByStorageType(Predicate<StorageType> predicate) { * federation path. * @param path Federation path. * @return List of quota remote locations. - * @throws IOException + * @throws IOException If the location for this path cannot be determined. */ private List<RemoteLocation> getQuotaRemoteLocations(String path) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java index e9b780d5bca..235190d2a48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java @@ -41,6 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; + /** * Service to periodically update the {@link RouterQuotaUsage} * cached information in the {@link Router}. @@ -99,6 +101,9 @@ protected void periodicInvoke() { // This is because mount table does not have mtime. // For other mount entry get current quota usage HdfsFileStatus ret = this.rpcServer.getFileInfo(src); + if (rpcServer.isAsync()) { + ret = syncReturn(HdfsFileStatus.class); + } if (ret == null || ret.getModificationTime() == 0) { long[] zeroConsume = new long[StorageType.values().length]; currentQuotaUsage = @@ -113,6 +118,9 @@ protected void periodicInvoke() { Quota quotaModule = this.rpcServer.getQuotaModule(); Map<RemoteLocation, QuotaUsage> usageMap = quotaModule.getEachQuotaUsage(src); + if (this.rpcServer.isAsync()) { + usageMap = (Map<RemoteLocation, QuotaUsage>)syncReturn(Map.class); + } currentQuotaUsage = quotaModule.aggregateQuota(src, usageMap); remoteQuotaUsage.putAll(usageMap); } catch (IOException ioe) { @@ -136,6 +144,8 @@ protected void periodicInvoke() { } } catch (IOException e) { LOG.error("Quota cache updated error.", e); + } catch (Exception e) { + LOG.error(e.toString()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java new file mode 100644 index 00000000000..0b1eeeec0be --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.ipc.CallerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertTrue; + +public class TestRouterAsyncQuota { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer routerRpcServer; + private AsyncQuota asyncQuota; + + private final String testfilePath = "/testdir/testAsyncQuota.file"; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + cluster.setRacks( + new String[] {"/rack1", "/rack2", "/rack3"}); + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .quota(true) + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + routerConf.setBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY, true); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + RouterRpcServer spy = Mockito.spy(routerRpcServer); + Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); + asyncQuota = new AsyncQuota(router.getRouter(), spy); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FsPermission permission = new FsPermission("705"); + routerFs.mkdirs(new Path("/testdir"), permission); + FSDataOutputStream fsDataOutputStream = routerFs.create( + new Path(testfilePath), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testdir")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + @Test + public void testRouterAsyncGetQuotaUsage() throws Exception { + asyncQuota.getQuotaUsage("/testdir"); + QuotaUsage quotaUsage = syncReturn(QuotaUsage.class); + // 3-replication. + Assert.assertEquals(3 * 1024, quotaUsage.getSpaceConsumed()); + // We have one directory and one file. + Assert.assertEquals(2, quotaUsage.getFileAndDirectoryCount()); + } + + @Test + public void testRouterAsyncSetQuotaUsage() throws Exception { + asyncQuota.setQuota("/testdir", Long.MAX_VALUE, 8096, StorageType.DISK, false); + syncReturn(void.class); + asyncQuota.getQuotaUsage("/testdir"); + QuotaUsage quotaUsage = syncReturn(QuotaUsage.class); + Assert.assertEquals(8096, quotaUsage.getTypeQuota(StorageType.DISK)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org