[ https://issues.apache.org/jira/browse/HDFS-16734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582096#comment-17582096 ]
ASF GitHub Bot commented on HDFS-16734: --------------------------------------- goiri commented on code in PR #4763: URL: https://github.com/apache/hadoop/pull/4763#discussion_r950626646 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java: ########## @@ -1242,16 +1244,98 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + /** + * Recursively get all the locations for the path. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns1 -> /a/b + * /a/b/c -> ns2 -> /a/b/c + * When the path is '/a', the result of locations should be + * {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]} + * @param path the path to get the locations. + * @param locations a map to store all the locations and key is namespace id. + * @throws IOException + */ + @VisibleForTesting + void getAllLocations(String path, Map<String, List<RemoteLocation>> locations) + throws IOException { + try { + List<RemoteLocation> parentLocations = + rpcServer.getLocationsForPath(path, false, false); + parentLocations.forEach( + l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l)); + } catch (NoLocationException | RouterResolveException e) { + LOG.debug(""); + } + + final List<String> children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + Path childPath = new Path(path, child); + getAllLocations(childPath.toUri().getPath(), locations); + } + } + } + + /** + * Get all the locations of the path for {@link this#getContentSummary(String)}. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns0 -> /a/b + * /a/b/c -> ns1 -> /a/b/c + * When the path is '/a', the result of locations should be + * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] + * When the path is '/b', will throw NoLocationException. + * @param path the path to get content summary + * @return one list contains all the remote location + * @throws IOException + */ + @VisibleForTesting + List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { + final Map<String, List<RemoteLocation>> ns2Locations = new ConcurrentHashMap<>(); + final List<RemoteLocation> locations = new ArrayList<>(); + + // Try to get all the locations of the path. + getAllLocations(path, ns2Locations); + + if (ns2Locations.isEmpty()) { + throw new NoLocationException(path, subclusterResolver.getClass()); + } + + // remove the redundancy remoteLocation order by destination. + ns2Locations.forEach((k, v) -> { + List<RemoteLocation> sortedList = v.stream().sorted().collect(Collectors.toList()); + int size = sortedList.size(); + for (int i = size - 1; i > -1; i--) { + RemoteLocation currentLocation = sortedList.get(i); + if (i - 1 == -1) { + locations.add(currentLocation); + continue; + } + + RemoteLocation preLocation = sortedList.get(i - 1); + if (!currentLocation.getDest().startsWith(preLocation.getDest() + Path.SEPARATOR)) { + locations.add(currentLocation); + } else { + LOG.debug(""); Review Comment: Expand ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java: ########## @@ -1242,16 +1244,98 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + /** + * Recursively get all the locations for the path. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns1 -> /a/b + * /a/b/c -> ns2 -> /a/b/c + * When the path is '/a', the result of locations should be + * {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]} + * @param path the path to get the locations. + * @param locations a map to store all the locations and key is namespace id. + * @throws IOException + */ + @VisibleForTesting + void getAllLocations(String path, Map<String, List<RemoteLocation>> locations) + throws IOException { + try { + List<RemoteLocation> parentLocations = + rpcServer.getLocationsForPath(path, false, false); + parentLocations.forEach( + l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l)); + } catch (NoLocationException | RouterResolveException e) { + LOG.debug(""); + } + + final List<String> children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + Path childPath = new Path(path, child); + getAllLocations(childPath.toUri().getPath(), locations); + } + } + } + + /** + * Get all the locations of the path for {@link this#getContentSummary(String)}. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns0 -> /a/b + * /a/b/c -> ns1 -> /a/b/c + * When the path is '/a', the result of locations should be + * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] + * When the path is '/b', will throw NoLocationException. + * @param path the path to get content summary + * @return one list contains all the remote location + * @throws IOException + */ + @VisibleForTesting + List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { + final Map<String, List<RemoteLocation>> ns2Locations = new ConcurrentHashMap<>(); Review Comment: Why concurrent? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java: ########## @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test a router end-to-end including the MountTable without default nameservice. + */ +public class TestRouterMountTableWithoutDefaultNS { + private static StateStoreDFSCluster cluster; + private static RouterContext routerContext; + private static MountTableResolver mountTable; + private static FileSystem nnFs0; + private static FileSystem nnFs1; + + @BeforeClass + public static void globalSetUp() throws Exception { + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE, false); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + nnFs0 = cluster.getNamenode("ns0", null).getFileSystem(); + nnFs1 = cluster.getNamenode("ns1", null).getFileSystem(); + routerContext = cluster.getRandomRouter(); + + // Get the end points + routerContext = cluster.getRandomRouter(); + Router router = routerContext.getRouter(); + mountTable = (MountTableResolver) router.getSubclusterResolver(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @After + public void clearMountTable() throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + GetMountTableEntriesRequest req1 = GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = mountTableManager.getMountTableEntries(req1); + for (MountTable entry : response.getEntries()) { + RemoveMountTableEntryRequest req2 = + RemoveMountTableEntryRequest.newInstance(entry.getSourcePath()); + mountTableManager.removeMountTableEntry(req2); + } + } + + /** + * Add a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to add. + * @return If it was succesfully added. + * @throws IOException Problems adding entries. + */ + private boolean addMountTable(final MountTable entry) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = mountTableManager.addMountTableEntry(addRequest); + + // Reload the Router cache + mountTable.loadCache(true); + + return addResponse.getStatus(); + } + + /** + * Verify that RBF that disable default nameservice should support + * get information about ancestor mount points. + */ + @Test + public void testGetContentSummaryWithSubMountPoint() throws IOException { + MountTable addEntry = MountTable.newInstance("/testdir/1/2", + Collections.singletonMap("ns0", "/testdir/1/2")); + assertTrue(addMountTable(addEntry)); + + try { + writeData(nnFs0, new Path("/testdir/1/2/3"), 10 * 1024 * 1024); + + RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer(); + ContentSummary summaryFromRBF = routerRpcServer.getContentSummary("/testdir"); + assertNotNull(summaryFromRBF); + assertEquals(1, summaryFromRBF.getFileCount()); + assertEquals(10 * 1024 * 1024, summaryFromRBF.getLength()); + } finally { + nnFs0.delete(new Path("/testdir"), true); + } + } + + @Test + public void testGetAllLocations() throws IOException { + // Add mount table entry. + MountTable addEntry = MountTable.newInstance("/testA", + Collections.singletonMap("ns0", "/testA")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB", + Collections.singletonMap("ns1", "/testA/testB")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB/testC", + Collections.singletonMap("ns2", "/testA/testB/testC")); + assertTrue(addMountTable(addEntry)); + + Map<String, List<RemoteLocation>> locations = new HashMap<>(); + RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule(); + protocol.getAllLocations("/testA", locations); + assertEquals(3, locations.size()); + } + + @Test + public void testGetLocationsForContentSummary() throws Exception { + // Add mount table entry. + MountTable addEntry = MountTable.newInstance("/testA/testB", + Collections.singletonMap("ns0", "/testA/testB")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB/testC", + Collections.singletonMap("ns1", "/testA/testB/testC")); + assertTrue(addMountTable(addEntry)); + + RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule(); + List<RemoteLocation> locations = protocol.getLocationsForContentSummary("/testA"); + assertEquals(2, locations.size()); + + for (RemoteLocation location : locations) { + String nsId = location.getNameserviceId(); + if ("ns0".equals(nsId)) { + assertEquals("/testA/testB", location.getDest()); + } else if ("ns1".equals(nsId)) { + assertEquals("/testA/testB/testC", location.getDest()); + } else { + fail(); Review Comment: Add a message ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java: ########## @@ -1242,16 +1244,98 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + /** + * Recursively get all the locations for the path. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns1 -> /a/b + * /a/b/c -> ns2 -> /a/b/c + * When the path is '/a', the result of locations should be + * {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]} + * @param path the path to get the locations. + * @param locations a map to store all the locations and key is namespace id. + * @throws IOException + */ + @VisibleForTesting + void getAllLocations(String path, Map<String, List<RemoteLocation>> locations) + throws IOException { + try { + List<RemoteLocation> parentLocations = + rpcServer.getLocationsForPath(path, false, false); + parentLocations.forEach( + l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l)); + } catch (NoLocationException | RouterResolveException e) { + LOG.debug(""); Review Comment: Something more meaningful? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java: ########## @@ -1242,16 +1244,98 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + /** + * Recursively get all the locations for the path. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns1 -> /a/b + * /a/b/c -> ns2 -> /a/b/c + * When the path is '/a', the result of locations should be + * {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]} + * @param path the path to get the locations. + * @param locations a map to store all the locations and key is namespace id. + * @throws IOException + */ + @VisibleForTesting + void getAllLocations(String path, Map<String, List<RemoteLocation>> locations) + throws IOException { + try { + List<RemoteLocation> parentLocations = + rpcServer.getLocationsForPath(path, false, false); + parentLocations.forEach( + l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l)); + } catch (NoLocationException | RouterResolveException e) { + LOG.debug(""); + } + + final List<String> children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + Path childPath = new Path(path, child); + getAllLocations(childPath.toUri().getPath(), locations); + } + } + } + + /** + * Get all the locations of the path for {@link this#getContentSummary(String)}. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns0 -> /a/b + * /a/b/c -> ns1 -> /a/b/c + * When the path is '/a', the result of locations should be + * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] + * When the path is '/b', will throw NoLocationException. + * @param path the path to get content summary + * @return one list contains all the remote location + * @throws IOException + */ + @VisibleForTesting + List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { + final Map<String, List<RemoteLocation>> ns2Locations = new ConcurrentHashMap<>(); + final List<RemoteLocation> locations = new ArrayList<>(); + + // Try to get all the locations of the path. + getAllLocations(path, ns2Locations); + + if (ns2Locations.isEmpty()) { + throw new NoLocationException(path, subclusterResolver.getClass()); + } + + // remove the redundancy remoteLocation order by destination. + ns2Locations.forEach((k, v) -> { + List<RemoteLocation> sortedList = v.stream().sorted().collect(Collectors.toList()); + int size = sortedList.size(); + for (int i = size - 1; i > -1; i--) { + RemoteLocation currentLocation = sortedList.get(i); + if (i - 1 == -1) { + locations.add(currentLocation); + continue; + } + + RemoteLocation preLocation = sortedList.get(i - 1); + if (!currentLocation.getDest().startsWith(preLocation.getDest() + Path.SEPARATOR)) { + locations.add(currentLocation); + } else { + LOG.debug(""); + } + } + }); + + return locations; + } + @Override public ContentSummary getContentSummary(String path) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); // Get the summaries from regular files final Collection<ContentSummary> summaries = new ArrayList<>(); - final List<RemoteLocation> locations = - rpcServer.getLocationsForPath(path, false, false); final RemoteMethod method = new RemoteMethod("getContentSummary", new Class<?>[] {String.class}, new RemoteParam()); + + final List<RemoteLocation> locations = getLocationsForContentSummary(path); Review Comment: Can we have this in the line it was by itself? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java: ########## @@ -1242,16 +1244,98 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + /** + * Recursively get all the locations for the path. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns1 -> /a/b + * /a/b/c -> ns2 -> /a/b/c + * When the path is '/a', the result of locations should be + * {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]} + * @param path the path to get the locations. + * @param locations a map to store all the locations and key is namespace id. + * @throws IOException + */ + @VisibleForTesting + void getAllLocations(String path, Map<String, List<RemoteLocation>> locations) + throws IOException { + try { + List<RemoteLocation> parentLocations = + rpcServer.getLocationsForPath(path, false, false); + parentLocations.forEach( + l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l)); + } catch (NoLocationException | RouterResolveException e) { + LOG.debug(""); + } + + final List<String> children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + Path childPath = new Path(path, child); + getAllLocations(childPath.toUri().getPath(), locations); + } + } + } + + /** + * Get all the locations of the path for {@link this#getContentSummary(String)}. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns0 -> /a/b + * /a/b/c -> ns1 -> /a/b/c + * When the path is '/a', the result of locations should be + * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] + * When the path is '/b', will throw NoLocationException. + * @param path the path to get content summary + * @return one list contains all the remote location + * @throws IOException + */ + @VisibleForTesting + List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException { + final Map<String, List<RemoteLocation>> ns2Locations = new ConcurrentHashMap<>(); + final List<RemoteLocation> locations = new ArrayList<>(); + + // Try to get all the locations of the path. + getAllLocations(path, ns2Locations); + + if (ns2Locations.isEmpty()) { + throw new NoLocationException(path, subclusterResolver.getClass()); + } + + // remove the redundancy remoteLocation order by destination. + ns2Locations.forEach((k, v) -> { + List<RemoteLocation> sortedList = v.stream().sorted().collect(Collectors.toList()); + int size = sortedList.size(); + for (int i = size - 1; i > -1; i--) { + RemoteLocation currentLocation = sortedList.get(i); + if (i - 1 == -1) { Review Comment: i == 0 ? > RBF: fix some bugs when handling getContentSummary RPC > ------------------------------------------------------ > > Key: HDFS-16734 > URL: https://issues.apache.org/jira/browse/HDFS-16734 > Project: Hadoop HDFS > Issue Type: Bug > Reporter: ZanderXu > Assignee: ZanderXu > Priority: Major > Labels: pull-request-available > > Suppose there are some mount points as bellows in RBF without default > namespace. > ||Source Path||NameSpace||Destination Path || > |/a/b|ns0|/a/b| > |/a/b/c|ns0|/a/b/c| > |/a/b/c/d|ns1|/a/b/c/d| > Suppose there a file /a/b/c/file1 with 10MB data in ns0 and a file > /a/b/c/d/file2 with 20MB data in ns1. > There are bugs during handling some cases: > ||Case Number||Case||Current Result||Expected Result|| > |1|getContentSummary('/a')|Throw RouterResolveException |2files and 30MB data| > |2|getContentSummary('/a/b')|2files and 40MB data|3files and 40MB data| > Bugs for these cases: > Case1: If can't find any locations for the path, RBF should try to do it > with sub mount points. > Case2: RBF shouldn't repeatedly get content summary from one same namespace > with same ancestors path, such as from ns0 with /a/b and from ns0 with /a/b/c. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org