This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 7ab88fe2626 HDFS-17744. [ARR] getEnclosingRoot RPC adapts to async rpc. (#7445). Contributed by hfutatzhanghb. 7ab88fe2626 is described below commit 7ab88fe262602556be6c768759ce9e605170e9c0 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Fri Mar 7 17:41:37 2025 +0800 HDFS-17744. [ARR] getEnclosingRoot RPC adapts to async rpc. (#7445). Contributed by hfutatzhanghb. Reviewed-by: Jian Zhang <keeprom...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../router/async/RouterAsyncClientProtocol.java | 38 +++++ .../router/async/TestRouterAsyncMountTable.java | 158 +++++++++++++++++++++ 2 files changed, 196 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java index f00ae63b902..c12249d16b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; @@ -104,6 +106,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol { private final boolean allowPartialList; /** Time out when getting the mount statistics. */ private long mountStatusTimeOut; + /** Default nameservice enabled. */ + private final boolean defaultNameServiceEnabled; /** Identifier for the super user. */ private String superUser; /** Identifier for the super group. */ @@ -126,6 +130,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) this.mountStatusTimeOut = getMountStatusTimeOut(); this.superUser = getSuperUser(); this.superGroup = getSuperGroup(); + this.defaultNameServiceEnabled = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE, + RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT); } @Override @@ -1086,4 +1093,35 @@ public boolean isMultiDestDirectory(String src) { return asyncReturn(boolean.class); } + + @Override + public Path getEnclosingRoot(String src) throws IOException { + final Path[] mountPath = new Path[1]; + if (defaultNameServiceEnabled) { + mountPath[0] = new Path("/"); + } + + if (subclusterResolver instanceof MountTableResolver) { + MountTableResolver mountTable = (MountTableResolver) subclusterResolver; + if (mountTable.getMountPoint(src) != null) { + mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath()); + } + } + + if (mountPath[0] == null) { + throw new IOException(String.format("No mount point for %s", src)); + } + + getEZForPath(src); + asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> { + if (zone == null) { + return mountPath[0]; + } else { + Path zonePath = new Path(zone.getPath()); + return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0]; + } + }); + return asyncReturn(Path.class); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java new file mode 100644 index 00000000000..177f6586079 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test a router end-to-end including the MountTable using async rpc. + */ +public class TestRouterAsyncMountTable { + public static final Logger LOG = LoggerFactory.getLogger(TestRouterAsyncMountTable.class); + + private static StateStoreDFSCluster cluster; + private static MiniRouterDFSCluster.NamenodeContext nnContext0; + private static MiniRouterDFSCluster.NamenodeContext nnContext1; + private static MiniRouterDFSCluster.RouterContext routerContext; + private static MountTableResolver mountTable; + private static FileSystem routerFs; + + @BeforeClass + public static void globalSetUp() throws Exception { + // Build and start a federated cluster. + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + // Get the end points. + nnContext0 = cluster.getNamenode("ns0", null); + nnContext1 = cluster.getNamenode("ns1", null); + routerContext = cluster.getRandomRouter(); + routerFs = routerContext.getFileSystem(); + Router router = routerContext.getRouter(); + mountTable = (MountTableResolver) router.getSubclusterResolver(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @After + public void clearMountTable() throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + GetMountTableEntriesRequest req1 = + GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = + mountTableManager.getMountTableEntries(req1); + for (MountTable entry : response.getEntries()) { + RemoveMountTableEntryRequest req2 = + RemoveMountTableEntryRequest.newInstance(entry.getSourcePath()); + mountTableManager.removeMountTableEntry(req2); + } + mountTable.setDefaultNSEnable(true); + } + + /** + * Add a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to add. + * @return If it was succesfully added. + * @throws IOException Problems adding entries. + */ + private boolean addMountTable(final MountTable entry) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = + mountTableManager.addMountTableEntry(addRequest); + + // Reload the Router cache. + mountTable.loadCache(true); + + return addResponse.getStatus(); + } + + @Test + public void testGetEnclosingRoot() throws Exception { + + // Add a read only entry. + MountTable readOnlyEntry = MountTable.newInstance( + "/readonly", Collections.singletonMap("ns0", "/testdir")); + readOnlyEntry.setReadOnly(true); + assertTrue(addMountTable(readOnlyEntry)); + assertEquals(routerFs.getEnclosingRoot(new Path("/readonly")), new Path("/readonly")); + + assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/")); + assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), + routerFs.getEnclosingRoot(routerFs.getEnclosingRoot(new Path("/regular")))); + + // Add a regular entry. + MountTable regularEntry = MountTable.newInstance( + "/regular", Collections.singletonMap("ns0", "/testdir")); + assertTrue(addMountTable(regularEntry)); + assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/regular")); + + // Path does not need to exist. + assertEquals(routerFs.getEnclosingRoot(new Path("/regular/pathDNE")), new Path("/regular")); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org