This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 246ebe809c8 HDFS-17731. [ARR] Add unit test for async RouterAdminServer. (#7409). Contributed by hfutatzhanghb. 246ebe809c8 is described below commit 246ebe809c8fef0dea255906c4c489f08f8e5376 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Mon Mar 3 11:26:50 2025 +0800 HDFS-17731. [ARR] Add unit test for async RouterAdminServer. (#7409). Contributed by hfutatzhanghb. Reviewed-by: Jian Zhang <keeprom...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../hdfs/server/federation/router/Router.java | 2 +- .../server/federation/router/TestRouterAdmin.java | 87 +++++++-------- .../router/async/TestAsyncRouterAdmin.java | 119 +++++++++++++++++++++ 3 files changed, 164 insertions(+), 44 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 3d996b3e849..14cc47ffa1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -409,7 +409,7 @@ public void run() { * @return New Router RPC Server. * @throws IOException If the router RPC server was not started. */ - protected RouterRpcServer createRpcServer() throws IOException { + public RouterRpcServer createRpcServer() throws IOException { return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(), this.getSubclusterResolver()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java index 205f36dbb01..6065e0e6ec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java @@ -74,20 +74,21 @@ * The administrator interface of the {@link Router} implemented by * {@link RouterAdminServer}. */ +@SuppressWarnings("checkstyle:visibilitymodifier") public class TestRouterAdmin { - private static StateStoreDFSCluster cluster; - private static RouterContext routerContext; + protected static StateStoreDFSCluster cluster; + protected static RouterContext routerContext; public static final String RPC_BEAN = "Hadoop:service=Router,name=FederationRPC"; - private static List<MountTable> mockMountTable; - private static StateStoreService stateStore; - private static RouterRpcClient mockRpcClient; + protected static List<MountTable> mockMountTable; + protected static StateStoreService stateStore; + protected static RouterRpcClient mockRpcClient; @BeforeClass public static void globalSetUp() throws Exception { cluster = new StateStoreDFSCluster(false, 1); - // Build and start a router with State Store + admin + RPC + // Build and start a router with State Store + admin + RPC. Configuration conf = new RouterConfigBuilder() .stateStore() .admin() @@ -101,7 +102,7 @@ public static void globalSetUp() throws Exception { Router router = routerContext.getRouter(); stateStore = router.getStateStore(); - // Add two name services for testing disabling + // Add two name services for testing disabling. ActiveNamenodeResolver membership = router.getNamenodeResolver(); membership.registerNamenode( createNamenodeReport("ns0", "nn1", HAServiceState.ACTIVE)); @@ -129,12 +130,12 @@ private static void setUpMocks() throws IOException, NoSuchFieldException, IllegalAccessException { RouterRpcServer spyRpcServer = Mockito.spy(routerContext.getRouter().createRpcServer()); - //Used reflection to set the 'rpcServer field' + // Used reflection to set the 'rpcServer field'. setField(routerContext.getRouter(), "rpcServer", spyRpcServer); Mockito.doReturn(null).when(spyRpcServer).getFileInfo(Mockito.anyString()); - // mock rpc client for destination check when editing mount tables. - //spy RPC client and used reflection to set the 'rpcClient' field + // Mock rpc client for destination check when editing mount tables. + // Spy RPC client and used reflection to set the 'rpcClient' field. mockRpcClient = Mockito.spy(spyRpcServer.getRPCClient()); setField(spyRpcServer, "rpcClient", mockRpcClient); RemoteLocation remoteLocation0 = @@ -172,7 +173,7 @@ public static void tearDown() { public void testSetup() throws Exception { assertTrue( synchronizeRecords(stateStore, mockMountTable, MountTable.class)); - // Avoid running with random users + // Avoid running with random users. routerContext.resetAdminClient(); } @@ -185,18 +186,18 @@ public void testAddMountTable() throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Existing mount table size + // Existing mount table size. List<MountTable> records = getMountTableEntries(mountTable); assertEquals(records.size(), mockMountTable.size()); - // Add + // Add. AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance(newEntry); AddMountTableEntryResponse addResponse = mountTable.addMountTableEntry(addRequest); assertTrue(addResponse.getStatus()); - // New mount table size + // New mount table size. List<MountTable> records2 = getMountTableEntries(mountTable); assertEquals(records2.size(), mockMountTable.size() + 1); } @@ -209,22 +210,22 @@ public void testAddDuplicateMountTable() throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Existing mount table size + // Existing mount table size. List<MountTable> entries1 = getMountTableEntries(mountTable); assertEquals(entries1.size(), mockMountTable.size()); - // Add + // Add. AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance(newEntry); AddMountTableEntryResponse addResponse = mountTable.addMountTableEntry(addRequest); assertTrue(addResponse.getStatus()); - // New mount table size + // New mount table size. List<MountTable> entries2 = getMountTableEntries(mountTable); assertEquals(entries2.size(), mockMountTable.size() + 1); - // Add again, should fail + // Add again, should fail. AddMountTableEntryResponse addResponse2 = mountTable.addMountTableEntry(addRequest); assertFalse(addResponse2.getStatus()); @@ -240,27 +241,27 @@ public void testAddReadOnlyMountTable() throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Existing mount table size + // Existing mount table size. List<MountTable> records = getMountTableEntries(mountTable); assertEquals(records.size(), mockMountTable.size()); - // Add + // Add. AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance(newEntry); AddMountTableEntryResponse addResponse = mountTable.addMountTableEntry(addRequest); assertTrue(addResponse.getStatus()); - // New mount table size + // New mount table size. List<MountTable> records2 = getMountTableEntries(mountTable); assertEquals(records2.size(), mockMountTable.size() + 1); - // Check that we have the read only entry + // Check that we have the read only entry. MountTable record = getMountTableEntry("/readonly"); assertEquals("/readonly", record.getSourcePath()); assertTrue(record.isReadOnly()); - // Removing the new entry + // Removing the new entry. RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest.newInstance("/readonly"); RemoveMountTableEntryResponse removeResponse = @@ -287,19 +288,19 @@ private void testAddOrderMountTable(final DestinationOrder order) RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Add + // Add. AddMountTableEntryRequest addRequest; AddMountTableEntryResponse addResponse; addRequest = AddMountTableEntryRequest.newInstance(newEntry); addResponse = mountTable.addMountTableEntry(addRequest); assertTrue(addResponse.getStatus()); - // Check that we have the read only entry + // Check that we have the read only entry. MountTable record = getMountTableEntry(mnt); assertEquals(mnt, record.getSourcePath()); assertEquals(order, record.getDestOrder()); - // Removing the new entry + // Removing the new entry. RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest.newInstance(mnt); RemoveMountTableEntryResponse removeResponse = @@ -313,16 +314,16 @@ public void testRemoveMountTable() throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Existing mount table size + // Existing mount table size. List<MountTable> entries1 = getMountTableEntries(mountTable); assertEquals(entries1.size(), mockMountTable.size()); - // Remove an entry + // Remove an entry. RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest.newInstance("/"); mountTable.removeMountTableEntry(removeRequest); - // New mount table size + // New mount table size. List<MountTable> entries2 = getMountTableEntries(mountTable); assertEquals(entries2.size(), mockMountTable.size() - 1); } @@ -333,7 +334,7 @@ public void testEditMountTable() throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Verify starting condition + // Verify starting condition. MountTable entry = getMountTableEntry("/"); assertEquals( Collections.singletonList(new RemoteLocation("ns0", "/", "/")), @@ -346,7 +347,7 @@ public void testEditMountTable() throws IOException { UpdateMountTableEntryRequest.newInstance(updatedEntry); mountTable.updateMountTableEntry(updateRequest); - // Verify edited condition + // Verify edited condition. entry = getMountTableEntry("/"); assertEquals( Collections.singletonList(new RemoteLocation("ns1", "/", "/")), @@ -359,11 +360,11 @@ public void testGetMountTable() throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTable = client.getMountTableManager(); - // Verify size of table + // Verify size of table. List<MountTable> entries = getMountTableEntries(mountTable); assertEquals(mockMountTable.size(), entries.size()); - // Verify all entries are present + // Verify all entries are present. int matches = 0; for (MountTable e : entries) { for (MountTable entry : mockMountTable) { @@ -387,7 +388,7 @@ public void testGetSingleMountTableEntry() throws IOException { @Test public void testVerifyFileInDestinations() throws IOException { - // this entry has been created in the mock setup + // This entry has been created in the mock setup. MountTable newEntry = MountTable.newInstance( "/testpath", Collections.singletonMap("ns0", "/testdir"), Time.now(), Time.now()); @@ -396,7 +397,7 @@ public void testVerifyFileInDestinations() throws IOException { List<String> result = adminServer.verifyFileInDestinations(newEntry); assertEquals(0, result.size()); - // this entry was not created in the mock + // This entry was not created in the mock. newEntry = MountTable.newInstance( "/testpath", Collections.singletonMap("ns0", "/testdir1"), Time.now(), Time.now()); @@ -413,7 +414,7 @@ public void testVerifyFileInDestinations() throws IOException { * @throws IOException If the state store could not be accessed. */ private MountTable getMountTableEntry(final String mount) throws IOException { - // Refresh the cache + // Refresh the cache. stateStore.loadCache(MountTableStoreImpl.class, true); GetMountTableEntriesRequest request = @@ -422,7 +423,7 @@ private MountTable getMountTableEntry(final String mount) throws IOException { MountTableManager mountTable = client.getMountTableManager(); List<MountTable> results = getMountTableEntries(mountTable, request); if (results.size() > 0) { - // First result is sorted to have the shortest mount string length + // First result is sorted to have the shortest mount string length. return results.get(0); } return null; @@ -449,22 +450,22 @@ public void testNameserviceManager() throws IOException { RouterClient client = routerContext.getAdminClient(); NameserviceManager nsManager = client.getNameserviceManager(); - // There shouldn't be any name service disabled + // There shouldn't be any name service disabled. Set<String> disabled = getDisabledNameservices(nsManager); assertTrue(disabled.isEmpty()); - // Disable one and see it + // Disable one and see it. DisableNameserviceRequest disableReq = DisableNameserviceRequest.newInstance("ns0"); DisableNameserviceResponse disableResp = nsManager.disableNameservice(disableReq); assertTrue(disableResp.getStatus()); - // Refresh the cache + // Refresh the cache. disabled = getDisabledNameservices(nsManager); assertEquals(1, disabled.size()); assertTrue(disabled.contains("ns0")); - // Enable one and we should have no disabled name services + // Enable one and we should have no disabled name services. EnableNameserviceRequest enableReq = EnableNameserviceRequest.newInstance("ns0"); EnableNameserviceResponse enableResp = @@ -473,7 +474,7 @@ public void testNameserviceManager() throws IOException { disabled = getDisabledNameservices(nsManager); assertTrue(disabled.isEmpty()); - // Non existing name services should fail + // Non existing name services should fail. disableReq = DisableNameserviceRequest.newInstance("nsunknown"); disableResp = nsManager.disableNameservice(disableReq); assertFalse(disableResp.getStatus()); @@ -503,7 +504,7 @@ public void testNameserviceManagerUnauthorized() throws Exception{ @Test public void testNameserviceManagerWithRules() throws Exception{ - // Try to disable a name service with a kerberos principal name + // Try to disable a name service with a kerberos principal name. String username = RouterAdminServer.getSuperUser() + "@Example.com"; DisableNameserviceResponse disableResp = testNameserviceManagerUser(username); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncRouterAdmin.java new file mode 100644 index 00000000000..ec0ebd99512 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncRouterAdmin.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterAdmin; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil; +import org.apache.hadoop.util.Lists; +import org.junit.BeforeClass; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; + +public class TestAsyncRouterAdmin extends TestRouterAdmin { + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 1); + // Build and start a router with State Store + admin + RPC. + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + conf.setBoolean(DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE, true); + conf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + + cluster.addRouterOverrides(conf); + cluster.startRouters(); + routerContext = cluster.getRandomRouter(); + mockMountTable = cluster.generateMockMountTable(); + Router router = routerContext.getRouter(); + stateStore = router.getStateStore(); + + // Add two name services for testing disabling. + ActiveNamenodeResolver membership = router.getNamenodeResolver(); + membership.registerNamenode( + createNamenodeReport("ns0", "nn1", HAServiceProtocol.HAServiceState.ACTIVE)); + membership.registerNamenode( + createNamenodeReport("ns1", "nn1", HAServiceProtocol.HAServiceState.ACTIVE)); + stateStore.refreshCaches(true); + + setUpMocks(); + } + + private static void setUpMocks() + throws IOException, NoSuchFieldException, IllegalAccessException { + RouterRpcServer spyRpcServer = + Mockito.spy(routerContext.getRouter().createRpcServer()); + // Used reflection to set the 'rpcServer field'. + setField(routerContext.getRouter(), "rpcServer", spyRpcServer); + Mockito.doReturn(null).when(spyRpcServer).getFileInfo(Mockito.anyString()); + + // Mock rpc client for destination check when editing mount tables. + // Spy RPC client and used reflection to set the 'rpcClient' field. + mockRpcClient = Mockito.spy(spyRpcServer.getRPCClient()); + setField(spyRpcServer, "rpcClient", mockRpcClient); + RemoteLocation remoteLocation0 = + new RemoteLocation("ns0", "/testdir", null); + RemoteLocation remoteLocation1 = + new RemoteLocation("ns1", "/", null); + final Map<RemoteLocation, HdfsFileStatus> mockResponse0 = new HashMap<>(); + final Map<RemoteLocation, HdfsFileStatus> mockResponse1 = new HashMap<>(); + mockResponse0.put(remoteLocation0, + new HdfsFileStatus.Builder().build()); + Mockito.doAnswer(invocationOnMock -> { + AsyncUtil.asyncComplete(mockResponse0); + return null; + }).when(mockRpcClient).invokeConcurrent( + Mockito.eq(Lists.newArrayList(remoteLocation0)), + Mockito.any(RemoteMethod.class), + Mockito.eq(false), + Mockito.eq(false), + Mockito.eq(HdfsFileStatus.class) + ); + mockResponse1.put(remoteLocation1, + new HdfsFileStatus.Builder().build()); + Mockito.doAnswer(invocationOnMock -> { + AsyncUtil.asyncComplete(mockResponse1); + return null; + }).when(mockRpcClient).invokeConcurrent( + Mockito.eq(Lists.newArrayList(remoteLocation1)), + Mockito.any(RemoteMethod.class), + Mockito.eq(false), + Mockito.eq(false), + Mockito.eq(HdfsFileStatus.class) + ); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org