This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 32dad2039ad HDFS-17733. [ARR] Optimize isMultiDestDirectory method using AsyncUtil class (#7415). Contributed by hfutatzhanghb. 32dad2039ad is described below commit 32dad2039ad8db94b413de65f3b4ef44f9b7791c Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Mon Mar 3 11:28:50 2025 +0800 HDFS-17733. [ARR] Optimize isMultiDestDirectory method using AsyncUtil class (#7415). Contributed by hfutatzhanghb. Reviewed-by: Jian Zhang <keeprom...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../federation/router/RouterClientProtocol.java | 2 +- .../router/async/RouterAsyncClientProtocol.java | 28 ++-- ...erRPCMultipleDestinationMountTableResolver.java | 21 +-- ...ncRPCMultipleDestinationMountTableResolver.java | 152 +++++++++++++++++++++ 4 files changed, 178 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index cab4fad1909..f86905cacfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -2448,7 +2448,7 @@ protected static boolean shouldAddMountPoint( * @throws IOException if unable to get the file status. */ @VisibleForTesting - protected boolean isMultiDestDirectory(String src) throws IOException { + public boolean isMultiDestDirectory(String src) throws IOException { try { if (rpcServer.isPathAll(src)) { List<RemoteLocation> locations; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java index f991b27b029..f00ae63b902 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.federation.router.async; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.ContentSummary; @@ -85,7 +86,6 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; -import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture; /** * Module that implements all the async RPC calls in {@link ClientProtocol} in the @@ -1055,11 +1055,11 @@ public boolean setReplication(String src, short replication) * @param src the source path * @return true if the path is directory and is supposed to be present in all * subclusters else false in all other scenarios. - * @throws IOException if unable to get the file status. */ + @VisibleForTesting @Override - public boolean isMultiDestDirectory(String src) throws IOException { - try { + public boolean isMultiDestDirectory(String src) { + asyncTry(() -> { if (rpcServer.isPathAll(src)) { List<RemoteLocation> locations; locations = rpcServer.getLocationsForPath(src, false, false); @@ -1067,23 +1067,23 @@ public boolean isMultiDestDirectory(String src) throws IOException { new Class<?>[] {String.class}, new RemoteParam()); rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null); - CompletableFuture<Object> completableFuture = getCompletableFuture(); - completableFuture = completableFuture.thenApply(o -> { - HdfsFileStatus fileStatus = (HdfsFileStatus) o; + asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> { if (fileStatus != null) { return fileStatus.isDirectory(); } else { LOG.debug("The destination {} doesn't exist.", src); + return false; } - return false; }); - asyncCompleteWith(completableFuture); - return asyncReturn(Boolean.class); + } else { + asyncComplete(false); } - } catch (UnresolvedPathException e) { + }); + asyncCatch((CatchFunction<Object, UnresolvedPathException>) (o, e) -> { LOG.debug("The destination {} is a symlink.", src); - } - asyncCompleteWith(CompletableFuture.completedFuture(false)); - return asyncReturn(Boolean.class); + return false; + }, UnresolvedPathException.class); + + return asyncReturn(boolean.class); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java index cbc11b27b2b..78daa8cdfe7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java @@ -75,17 +75,18 @@ /** * Tests router rpc with multiple destination mount table resolver. */ +@SuppressWarnings("checkstyle:visibilitymodifier") public class TestRouterRPCMultipleDestinationMountTableResolver { - private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1", "ns2"); + protected static final List<String> NS_IDS = Arrays.asList("ns0", "ns1", "ns2"); - private static StateStoreDFSCluster cluster; - private static RouterContext routerContext; - private static MountTableResolver resolver; - private static DistributedFileSystem nnFs0; - private static DistributedFileSystem nnFs1; - private static DistributedFileSystem nnFs2; - private static DistributedFileSystem routerFs; - private static RouterRpcServer rpcServer; + protected static StateStoreDFSCluster cluster; + protected static RouterContext routerContext; + protected static MountTableResolver resolver; + protected static DistributedFileSystem nnFs0; + protected static DistributedFileSystem nnFs1; + protected static DistributedFileSystem nnFs2; + protected static DistributedFileSystem routerFs; + protected static RouterRpcServer rpcServer; @BeforeClass public static void setUp() throws Exception { @@ -408,7 +409,7 @@ private boolean verifyDirectoryLevelInvocations(boolean dirAll, * @return If it was successfully added. * @throws IOException + * Problems adding entries. */ - private boolean addMountTable(final MountTable entry) throws IOException { + protected boolean addMountTable(final MountTable entry) throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTableManager = client.getMountTableManager(); AddMountTableEntryRequest addRequest = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRPCMultipleDestinationMountTableResolver.java new file mode 100644 index 00000000000..517d01f3a30 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRPCMultipleDestinationMountTableResolver.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; +import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterRPCMultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests router async rpc with multiple destination mount table resolver. + */ +public class TestRouterAsyncRPCMultipleDestinationMountTableResolver extends + TestRouterRPCMultipleDestinationMountTableResolver { + + @BeforeClass + public static void setUp() throws Exception { + + // Build and start a federated cluster. + cluster = new StateStoreDFSCluster(false, 3, + MultipleDestinationMountTableResolver.class); + Configuration routerConf = + new RouterConfigBuilder().stateStore().admin().quota().rpc().build(); + routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + + Configuration hdfsConf = new Configuration(false); + hdfsConf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + + cluster.addRouterOverrides(routerConf); + cluster.addNamenodeOverrides(hdfsConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + routerContext = cluster.getRandomRouter(); + resolver = + (MountTableResolver) routerContext.getRouter().getSubclusterResolver(); + nnFs0 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(0), null).getFileSystem(); + nnFs1 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(1), null).getFileSystem(); + nnFs2 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(2), null).getFileSystem(); + routerFs = (DistributedFileSystem) routerContext.getFileSystem(); + rpcServer =routerContext.getRouter().getRpcServer(); + } + + @Override + @Test + public void testInvokeAtAvailableNs() throws IOException { + // Create a mount point with multiple destinations. + Path path = new Path("/testInvokeAtAvailableNs"); + Map<String, String> destMap = new HashMap<>(); + destMap.put("ns0", "/testInvokeAtAvailableNs"); + destMap.put("ns1", "/testInvokeAtAvailableNs"); + nnFs0.mkdirs(path); + nnFs1.mkdirs(path); + MountTable addEntry = + MountTable.newInstance("/testInvokeAtAvailableNs", destMap); + addEntry.setQuota(new RouterQuotaUsage.Builder().build()); + addEntry.setDestOrder(DestinationOrder.RANDOM); + addEntry.setFaultTolerant(true); + assertTrue(addMountTable(addEntry)); + + // Make one subcluster unavailable. + MiniDFSCluster dfsCluster = cluster.getCluster(); + dfsCluster.shutdownNameNode(0); + dfsCluster.shutdownNameNode(1); + try { + // Verify that #invokeAtAvailableNs works by calling #getServerDefaults. + RemoteMethod method = new RemoteMethod("getServerDefaults"); + FsServerDefaults serverDefaults = null; + rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class); + try { + serverDefaults = syncReturn(FsServerDefaults.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertNotNull(serverDefaults); + } finally { + dfsCluster.restartNameNode(0); + dfsCluster.restartNameNode(1); + } + } + + @Override + @Test + public void testIsMultiDestDir() throws Exception { + RouterClientProtocol client = + routerContext.getRouter().getRpcServer().getClientProtocolModule(); + setupOrderMountPath(DestinationOrder.HASH_ALL); + // Should be true only for directory and false for all other cases. + client.isMultiDestDirectory("/mount/dir"); + assertTrue(syncReturn(boolean.class)); + client.isMultiDestDirectory("/mount/nodir"); + assertFalse(syncReturn(boolean.class)); + client.isMultiDestDirectory("/mount/dir/file"); + assertFalse(syncReturn(boolean.class)); + routerFs.createSymlink(new Path("/mount/dir/file"), + new Path("/mount/dir/link"), true); + client.isMultiDestDirectory("/mount/dir/link"); + assertFalse(syncReturn(boolean.class)); + routerFs.createSymlink(new Path("/mount/dir/dir"), + new Path("/mount/dir/linkDir"), true); + client.isMultiDestDirectory("/mount/dir/linkDir"); + assertFalse(syncReturn(boolean.class)); + resetTestEnvironment(); + // Test single directory destination. Should be false for the directory. + setupOrderMountPath(DestinationOrder.HASH); + client.isMultiDestDirectory("/mount/dir"); + assertFalse(syncReturn(boolean.class)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org