http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java new file mode 100644 index 0000000..6111c6b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; + +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Supplier; + +/** + * Tests Router admin commands. + */ +public class TestRouterAdminCLI { + private static StateStoreDFSCluster cluster; + private static RouterContext routerContext; + private static StateStoreService stateStore; + + private static RouterAdmin admin; + private static RouterClient client; + + private static final String TEST_USER = "test-user"; + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + private static final PrintStream OLD_OUT = System.out; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 1); + // Build and start a router with State Store + admin + RPC + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + cluster.addRouterOverrides(conf); + + // Start routers + cluster.startRouters(); + + routerContext = cluster.getRandomRouter(); + Router router = routerContext.getRouter(); + stateStore = router.getStateStore(); + + Configuration routerConf = new Configuration(); + InetSocketAddress routerSocket = router.getAdminServerAddress(); + routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + routerSocket); + admin = new RouterAdmin(routerConf); + client = routerContext.getAdminClient(); + } + + @AfterClass + public static void tearDownCluster() { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + + @After + public void tearDown() { + // set back system out + System.setOut(OLD_OUT); + } + + @Test + public void testAddMountTable() throws Exception { + String nsId = "ns0"; + String src = "/test-addmounttable"; + String dest = "/addmounttable"; + String[] argv = new String[] {"-add", src, nsId, dest}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(src); + GetMountTableEntriesResponse getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + MountTable mountTable = getResponse.getEntries().get(0); + + List<RemoteLocation> destinations = mountTable.getDestinations(); + assertEquals(1, destinations.size()); + + assertEquals(src, mountTable.getSourcePath()); + assertEquals(nsId, destinations.get(0).getNameserviceId()); + assertEquals(dest, destinations.get(0).getDest()); + assertFalse(mountTable.isReadOnly()); + + // test mount table update behavior + dest = dest + "-new"; + argv = new String[] {"-add", src, nsId, dest, "-readonly"}; + assertEquals(0, ToolRunner.run(admin, argv)); + stateStore.loadCache(MountTableStoreImpl.class, true); + + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + mountTable = getResponse.getEntries().get(0); + assertEquals(2, mountTable.getDestinations().size()); + assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId()); + assertEquals(dest, mountTable.getDestinations().get(1).getDest()); + assertTrue(mountTable.isReadOnly()); + } + + @Test + public void testAddOrderMountTable() throws Exception { + testAddOrderMountTable(DestinationOrder.HASH); + testAddOrderMountTable(DestinationOrder.LOCAL); + testAddOrderMountTable(DestinationOrder.RANDOM); + testAddOrderMountTable(DestinationOrder.HASH_ALL); + } + + private void testAddOrderMountTable(DestinationOrder order) + throws Exception { + final String mnt = "/" + order; + final String nsId = "ns0,ns1"; + final String dest = "/"; + String[] argv = new String[] { + "-add", mnt, nsId, dest, "-order", order.toString()}; + assertEquals(0, ToolRunner.run(admin, argv)); + + // Check the state in the State Store + stateStore.loadCache(MountTableStoreImpl.class, true); + MountTableManager mountTable = client.getMountTableManager(); + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(mnt); + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + List<MountTable> entries = response.getEntries(); + assertEquals(1, entries.size()); + assertEquals(2, entries.get(0).getDestinations().size()); + assertEquals(order, response.getEntries().get(0).getDestOrder()); + } + + @Test + public void testListMountTable() throws Exception { + String nsId = "ns0"; + String src = "/test-lsmounttable"; + String dest = "/lsmounttable"; + String[] argv = new String[] {"-add", src, nsId, dest}; + assertEquals(0, ToolRunner.run(admin, argv)); + + // re-set system out for testing + System.setOut(new PrintStream(out)); + stateStore.loadCache(MountTableStoreImpl.class, true); + argv = new String[] {"-ls", src}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertTrue(out.toString().contains(src)); + + out.reset(); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance("/"); + GetMountTableEntriesResponse getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + + // Test ls command without input path, it will list + // mount table under root path. + argv = new String[] {"-ls"}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertTrue(out.toString().contains(src)); + String outStr = out.toString(); + // verify if all the mount table are listed + for(MountTable entry: getResponse.getEntries()) { + assertTrue(outStr.contains(entry.getSourcePath())); + } + } + + @Test + public void testRemoveMountTable() throws Exception { + String nsId = "ns0"; + String src = "/test-rmmounttable"; + String dest = "/rmmounttable"; + String[] argv = new String[] {"-add", src, nsId, dest}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(src); + GetMountTableEntriesResponse getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + // ensure mount table added successfully + MountTable mountTable = getResponse.getEntries().get(0); + assertEquals(src, mountTable.getSourcePath()); + + argv = new String[] {"-rm", src}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + assertEquals(0, getResponse.getEntries().size()); + + // remove an invalid mount table + String invalidPath = "/invalid"; + System.setOut(new PrintStream(out)); + argv = new String[] {"-rm", invalidPath}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertTrue(out.toString().contains( + "Cannot remove mount point " + invalidPath)); + } + + @Test + public void testMountTableDefaultACL() throws Exception { + String[] argv = new String[] {"-add", "/testpath0", "ns0", "/testdir0"}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance("/testpath0"); + GetMountTableEntriesResponse getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + MountTable mountTable = getResponse.getEntries().get(0); + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + String group = ugi.getGroups().isEmpty() ? ugi.getShortUserName() + : ugi.getPrimaryGroupName(); + assertEquals(ugi.getShortUserName(), mountTable.getOwnerName()); + assertEquals(group, mountTable.getGroupName()); + assertEquals((short) 0755, mountTable.getMode().toShort()); + } + + @Test + public void testMountTablePermissions() throws Exception { + // re-set system out for testing + System.setOut(new PrintStream(out)); + // use superuser to add new mount table with only read permission + String[] argv = new String[] {"-add", "/testpath2-1", "ns0", "/testdir2-1", + "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0455"}; + assertEquals(0, ToolRunner.run(admin, argv)); + + String superUser = UserGroupInformation. + getCurrentUser().getShortUserName(); + // use normal user as current user to test + UserGroupInformation remoteUser = UserGroupInformation + .createRemoteUser(TEST_USER); + UserGroupInformation.setLoginUser(remoteUser); + + // verify read permission by executing other commands + verifyExecutionResult("/testpath2-1", true, -1, -1); + + // add new mount table with only write permission + argv = new String[] {"-add", "/testpath2-2", "ns0", "/testdir2-2", + "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0255"}; + assertEquals(0, ToolRunner.run(admin, argv)); + verifyExecutionResult("/testpath2-2", false, 0, 0); + + // set mount table entry with read and write permission + argv = new String[] {"-add", "/testpath2-3", "ns0", "/testdir2-3", + "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0755"}; + assertEquals(0, ToolRunner.run(admin, argv)); + verifyExecutionResult("/testpath2-3", true, 0, 0); + + // set back login user + remoteUser = UserGroupInformation.createRemoteUser(superUser); + UserGroupInformation.setLoginUser(remoteUser); + } + + /** + * Verify router admin commands execution result. + * + * @param mount + * target mount table + * @param canRead + * whether can list mount tables under specified mount + * @param addCommandCode + * expected return code of add command executed for specified mount + * @param rmCommandCode + * expected return code of rm command executed for specified mount + * @throws Exception + */ + private void verifyExecutionResult(String mount, boolean canRead, + int addCommandCode, int rmCommandCode) throws Exception { + String[] argv = null; + stateStore.loadCache(MountTableStoreImpl.class, true); + + out.reset(); + // execute ls command + argv = new String[] {"-ls", mount}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertEquals(canRead, out.toString().contains(mount)); + + // execute add/update command + argv = new String[] {"-add", mount, "ns0", mount + "newdir"}; + assertEquals(addCommandCode, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + // execute remove command + argv = new String[] {"-rm", mount}; + assertEquals(rmCommandCode, ToolRunner.run(admin, argv)); + } + + @Test + public void testSetAndClearQuota() throws Exception { + String nsId = "ns0"; + String src = "/test-QuotaMounttable"; + String dest = "/QuotaMounttable"; + String[] argv = new String[] {"-add", src, nsId, dest}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(src); + GetMountTableEntriesResponse getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + MountTable mountTable = getResponse.getEntries().get(0); + RouterQuotaUsage quotaUsage = mountTable.getQuota(); + + // verify the default quota set + assertEquals(RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT, + quotaUsage.getFileAndDirectoryCount()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota()); + assertEquals(RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT, + quotaUsage.getSpaceConsumed()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota()); + + long nsQuota = 50; + long ssQuota = 100; + argv = new String[] {"-setQuota", src, "-nsQuota", String.valueOf(nsQuota), + "-ssQuota", String.valueOf(ssQuota)}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + mountTable = getResponse.getEntries().get(0); + quotaUsage = mountTable.getQuota(); + + // verify if the quota is set + assertEquals(nsQuota, quotaUsage.getQuota()); + assertEquals(ssQuota, quotaUsage.getSpaceQuota()); + + // use quota string for setting ss quota + String newSsQuota = "2m"; + argv = new String[] {"-setQuota", src, "-ssQuota", newSsQuota}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + mountTable = getResponse.getEntries().get(0); + quotaUsage = mountTable.getQuota(); + // verify if ss quota is correctly set + assertEquals(2 * 1024 * 1024, quotaUsage.getSpaceQuota()); + + // test clrQuota command + argv = new String[] {"-clrQuota", src}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + mountTable = getResponse.getEntries().get(0); + quotaUsage = mountTable.getQuota(); + + // verify if quota unset successfully + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota()); + } + + @Test + public void testManageSafeMode() throws Exception { + // ensure the Router become RUNNING state + waitState(RouterServiceState.RUNNING); + assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode()); + assertEquals(0, ToolRunner.run(admin, + new String[] {"-safemode", "enter"})); + // verify state + assertEquals(RouterServiceState.SAFEMODE, + routerContext.getRouter().getRouterState()); + assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode()); + + System.setOut(new PrintStream(out)); + assertEquals(0, ToolRunner.run(admin, + new String[] {"-safemode", "get"})); + assertTrue(out.toString().contains("true")); + + assertEquals(0, ToolRunner.run(admin, + new String[] {"-safemode", "leave"})); + // verify state + assertEquals(RouterServiceState.RUNNING, + routerContext.getRouter().getRouterState()); + assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode()); + + out.reset(); + assertEquals(0, ToolRunner.run(admin, + new String[] {"-safemode", "get"})); + assertTrue(out.toString().contains("false")); + } + + @Test + public void testCreateInvalidEntry() throws Exception { + String[] argv = new String[] { + "-add", "test-createInvalidEntry", "ns0", "/createInvalidEntry"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + + argv = new String[] { + "-add", "/test-createInvalidEntry", "ns0", "createInvalidEntry"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + + argv = new String[] { + "-add", null, "ns0", "/createInvalidEntry"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + + argv = new String[] { + "-add", "/test-createInvalidEntry", "ns0", null}; + assertEquals(-1, ToolRunner.run(admin, argv)); + + argv = new String[] { + "-add", "", "ns0", "/createInvalidEntry"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + + argv = new String[] { + "-add", "/test-createInvalidEntry", null, "/createInvalidEntry"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + + argv = new String[] { + "-add", "/test-createInvalidEntry", "", "/createInvalidEntry"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + } + + /** + * Wait for the Router transforming to expected state. + * @param expectedState Expected Router state. + * @throws Exception + */ + private void waitState(final RouterServiceState expectedState) + throws Exception { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return expectedState == routerContext.getRouter().getRouterState(); + } + }, 1000, 30000); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java new file mode 100644 index 0000000..80f2327 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for router heartbeat service. + */ +public class TestRouterHeartbeatService { + private Router router; + private final String routerId = "router1"; + private TestingServer testingServer; + private CuratorFramework curatorFramework; + + @Before + public void setup() throws Exception { + router = new Router(); + router.setRouterId(routerId); + Configuration conf = new Configuration(); + conf.setInt(RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1); + Configuration routerConfig = + new RouterConfigBuilder(conf).stateStore().build(); + routerConfig.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + TimeUnit.HOURS.toMillis(1)); + routerConfig.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + StateStoreZooKeeperImpl.class, StateStoreDriver.class); + + testingServer = new TestingServer(); + String connectStr = testingServer.getConnectString(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(connectStr) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + routerConfig.set(CommonConfigurationKeys.ZK_ADDRESS, connectStr); + router.init(routerConfig); + router.start(); + + + waitStateStore(router.getStateStore(), TimeUnit.SECONDS.toMicros(10)); + } + + @Test + public void testStateStoreUnavailable() throws IOException { + curatorFramework.close(); + testingServer.stop(); + router.getStateStore().stop(); + // The driver is not ready + assertFalse(router.getStateStore().isDriverReady()); + + // Do a heartbeat, and no exception thrown out + RouterHeartbeatService heartbeatService = + new RouterHeartbeatService(router); + heartbeatService.updateStateStore(); + } + + @Test + public void testStateStoreAvailable() throws Exception { + // The driver is ready + StateStoreService stateStore = router.getStateStore(); + assertTrue(router.getStateStore().isDriverReady()); + RouterStore routerStore = router.getRouterStateManager(); + + // No record about this router + stateStore.refreshCaches(true); + GetRouterRegistrationRequest request = + GetRouterRegistrationRequest.newInstance(routerId); + GetRouterRegistrationResponse response = + router.getRouterStateManager().getRouterRegistration(request); + RouterState routerState = response.getRouter(); + String id = routerState.getRouterId(); + StateStoreVersion version = routerState.getStateStoreVersion(); + assertNull(id); + assertNull(version); + + // Do a heartbeat + RouterHeartbeatService heartbeatService = + new RouterHeartbeatService(router); + heartbeatService.updateStateStore(); + + // We should have a record + stateStore.refreshCaches(true); + request = GetRouterRegistrationRequest.newInstance(routerId); + response = routerStore.getRouterRegistration(request); + routerState = response.getRouter(); + id = routerState.getRouterId(); + version = routerState.getStateStoreVersion(); + assertNotNull(id); + assertNotNull(version); + } + + @After + public void tearDown() throws IOException { + if (curatorFramework != null) { + curatorFramework.close(); + } + if (testingServer != null) { + testingServer.stop(); + } + if (router != null) { + router.shutDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java new file mode 100644 index 0000000..8702b3c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test a router end-to-end including the MountTable. + */ +public class TestRouterMountTable { + + private static StateStoreDFSCluster cluster; + private static NamenodeContext nnContext; + private static RouterContext routerContext; + private static MountTableResolver mountTable; + + @BeforeClass + public static void globalSetUp() throws Exception { + + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 1); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + // Get the end points + nnContext = cluster.getRandomNamenode(); + routerContext = cluster.getRandomRouter(); + Router router = routerContext.getRouter(); + mountTable = (MountTableResolver) router.getSubclusterResolver(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testReadOnly() throws Exception { + + // Add a read only entry + MountTable readOnlyEntry = MountTable.newInstance( + "/readonly", Collections.singletonMap("ns0", "/testdir")); + readOnlyEntry.setReadOnly(true); + assertTrue(addMountTable(readOnlyEntry)); + + // Add a regular entry + MountTable regularEntry = MountTable.newInstance( + "/regular", Collections.singletonMap("ns0", "/testdir")); + assertTrue(addMountTable(regularEntry)); + + // Create a folder which should show in all locations + final FileSystem nnFs = nnContext.getFileSystem(); + final FileSystem routerFs = routerContext.getFileSystem(); + assertTrue(routerFs.mkdirs(new Path("/regular/newdir"))); + + FileStatus dirStatusNn = + nnFs.getFileStatus(new Path("/testdir/newdir")); + assertTrue(dirStatusNn.isDirectory()); + FileStatus dirStatusRegular = + routerFs.getFileStatus(new Path("/regular/newdir")); + assertTrue(dirStatusRegular.isDirectory()); + FileStatus dirStatusReadOnly = + routerFs.getFileStatus(new Path("/readonly/newdir")); + assertTrue(dirStatusReadOnly.isDirectory()); + + // It should fail writing into a read only path + try { + routerFs.mkdirs(new Path("/readonly/newdirfail")); + fail("We should not be able to write into a read only mount point"); + } catch (IOException ioe) { + String msg = ioe.getMessage(); + assertTrue(msg.startsWith( + "/readonly/newdirfail is in a read only mount point")); + } + } + + /** + * Add a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to add. + * @return If it was succesfully added. + * @throws IOException Problems adding entries. + */ + private boolean addMountTable(final MountTable entry) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = + mountTableManager.addMountTableEntry(addRequest); + + // Reload the Router cache + mountTable.loadCache(true); + + return addResponse.getStatus(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java new file mode 100644 index 0000000..eabc0fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test namenodes monitor behavior in the Router. + */ +public class TestRouterNamenodeMonitoring { + + private static StateStoreDFSCluster cluster; + private static RouterContext routerContext; + private static MembershipNamenodeResolver resolver; + + private String ns0; + private String ns1; + private long initializedTime; + + @Before + public void setUp() throws Exception { + // Build and start a federated cluster with HA enabled + cluster = new StateStoreDFSCluster(true, 2); + // Enable heartbeat service and local heartbeat + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .enableLocalHeartbeat(true) + .heartbeat() + .build(); + + // Specify local node (ns0.nn1) to monitor + StringBuilder sb = new StringBuilder(); + ns0 = cluster.getNameservices().get(0); + NamenodeContext context = cluster.getNamenodes(ns0).get(1); + routerConf.set(DFS_NAMESERVICE_ID, ns0); + routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId()); + + // Specify namenodes (ns1.nn0,ns1.nn1) to monitor + sb = new StringBuilder(); + ns1 = cluster.getNameservices().get(1); + for (NamenodeContext ctx : cluster.getNamenodes(ns1)) { + String suffix = ctx.getConfSuffix(); + if (sb.length() != 0) { + sb.append(","); + } + sb.append(suffix); + } + // override with the namenodes: ns1.nn0,ns1.nn1 + routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); + + cluster.addRouterOverrides(routerConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + routerContext = cluster.getRandomRouter(); + resolver = (MembershipNamenodeResolver) routerContext.getRouter() + .getNamenodeResolver(); + initializedTime = Time.now(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testNamenodeMonitoring() throws Exception { + // Set nn0 to active for all nameservices + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, "nn0"); + cluster.switchToStandby(ns, "nn1"); + } + + Collection<NamenodeHeartbeatService> heartbeatServices = routerContext + .getRouter().getNamenodeHearbeatServices(); + // manually trigger the heartbeat + for (NamenodeHeartbeatService service : heartbeatServices) { + service.periodicInvoke(); + } + + resolver.loadCache(true); + List<? extends FederationNamenodeContext> namespaceInfo0 = + resolver.getNamenodesForNameserviceId(ns0); + List<? extends FederationNamenodeContext> namespaceInfo1 = + resolver.getNamenodesForNameserviceId(ns1); + + // The modified date won't be updated in ns0.nn0 since it isn't + // monitored by the Router. + assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId()); + assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime); + + // other namnodes should be updated as expected + assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId()); + assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime); + + assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId()); + assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime); + + assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId()); + assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java new file mode 100644 index 0000000..66a955e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java @@ -0,0 +1,451 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Supplier; + +/** + * Tests quota behaviors in Router-based Federation. + */ +public class TestRouterQuota { + private static StateStoreDFSCluster cluster; + private static NamenodeContext nnContext1; + private static NamenodeContext nnContext2; + private static RouterContext routerContext; + private static MountTableResolver resolver; + + private static final int BLOCK_SIZE = 512; + + @Before + public void setUp() throws Exception { + + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .quota() + .rpc() + .build(); + routerConf.set(RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s"); + + // override some hdfs settings that used in testing space quota + Configuration hdfsConf = new Configuration(false); + hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1); + + cluster.addRouterOverrides(routerConf); + cluster.addNamenodeOverrides(hdfsConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null); + nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null); + routerContext = cluster.getRandomRouter(); + Router router = routerContext.getRouter(); + resolver = (MountTableResolver) router.getSubclusterResolver(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testNamespaceQuotaExceed() throws Exception { + long nsQuota = 3; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /nsquota --> ns0---testdir1 + // /nsquota/subdir --> ns1---testdir2 + nnFs1.mkdirs(new Path("/testdir1")); + nnFs2.mkdirs(new Path("/testdir2")); + MountTable mountTable1 = MountTable.newInstance("/nsquota", + Collections.singletonMap("ns0", "/testdir1")); + + mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir", + Collections.singletonMap("ns1", "/testdir2")); + mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build()); + addMountTable(mountTable2); + + final FileSystem routerFs = routerContext.getFileSystem(); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + + @Override + public Boolean get() { + boolean isNsQuotaViolated = false; + try { + // create new directory to trigger NSQuotaExceededException + routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID())); + routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID())); + } catch (NSQuotaExceededException e) { + isNsQuotaViolated = true; + } catch (IOException ignored) { + } + return isNsQuotaViolated; + } + }, 5000, 60000); + // mkdir in real FileSystem should be okay + nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID())); + nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID())); + } + + @Test + public void testStorageSpaceQuotaaExceed() throws Exception { + long ssQuota = 3071; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /ssquota --> ns0---testdir3 + // /ssquota/subdir --> ns1---testdir4 + nnFs1.mkdirs(new Path("/testdir3")); + nnFs2.mkdirs(new Path("/testdir4")); + MountTable mountTable1 = MountTable.newInstance("/ssquota", + Collections.singletonMap("ns0", "/testdir3")); + + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir", + Collections.singletonMap("ns1", "/testdir4")); + mountTable2 + .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build()); + addMountTable(mountTable2); + + DFSClient routerClient = routerContext.getClient(); + routerClient.create("/ssquota/file", true).close(); + routerClient.create("/ssquota/subdir/file", true).close(); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + + @Override + public Boolean get() { + boolean isDsQuotaViolated = false; + try { + // append data to trigger NSQuotaExceededException + appendData("/ssquota/file", routerClient, BLOCK_SIZE); + appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE); + } catch (DSQuotaExceededException e) { + isDsQuotaViolated = true; + } catch (IOException ignored) { + } + return isDsQuotaViolated; + } + }, 5000, 60000); + + // append data to destination path in real FileSystem should be okay + appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE); + appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE); + } + + /** + * Add a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to add. + * @return If it was successfully added. + * @throws IOException Problems adding entries. + */ + private boolean addMountTable(final MountTable entry) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = + mountTableManager.addMountTableEntry(addRequest); + + // Reload the Router cache + resolver.loadCache(true); + + return addResponse.getStatus(); + } + + /** + * Append data in specified file. + * @param path Path of file. + * @param client DFS Client. + * @param dataLen The length of write data. + * @throws IOException + */ + private void appendData(String path, DFSClient client, int dataLen) + throws IOException { + EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND); + HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null, + null); + byte[] data = new byte[dataLen]; + stream.write(data); + stream.close(); + } + + @Test + public void testSetQuota() throws Exception { + long nsQuota = 5; + long ssQuota = 100; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /setquota --> ns0---testdir5 + // /setquota/subdir --> ns1---testdir6 + nnFs1.mkdirs(new Path("/testdir5")); + nnFs2.mkdirs(new Path("/testdir6")); + MountTable mountTable1 = MountTable.newInstance("/setquota", + Collections.singletonMap("ns0", "/testdir5")); + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + // don't set quota for subpath of mount table + MountTable mountTable2 = MountTable.newInstance("/setquota/subdir", + Collections.singletonMap("ns1", "/testdir6")); + addMountTable(mountTable2); + + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + // ensure setQuota RPC call was invoked + updateService.periodicInvoke(); + + ClientProtocol client1 = nnContext1.getClient().getNamenode(); + ClientProtocol client2 = nnContext2.getClient().getNamenode(); + final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5"); + final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6"); + + assertEquals(nsQuota, quota1.getQuota()); + assertEquals(ssQuota, quota1.getSpaceQuota()); + assertEquals(nsQuota, quota2.getQuota()); + assertEquals(ssQuota, quota2.getSpaceQuota()); + } + + @Test + public void testGetQuota() throws Exception { + long nsQuota = 10; + long ssQuota = 100; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /getquota --> ns0---/testdir7 + // /getquota/subdir1 --> ns0---/testdir7/subdir + // /getquota/subdir2 --> ns1---/testdir8 + nnFs1.mkdirs(new Path("/testdir7")); + nnFs1.mkdirs(new Path("/testdir7/subdir")); + nnFs2.mkdirs(new Path("/testdir8")); + MountTable mountTable1 = MountTable.newInstance("/getquota", + Collections.singletonMap("ns0", "/testdir7")); + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1", + Collections.singletonMap("ns0", "/testdir7/subdir")); + addMountTable(mountTable2); + + MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2", + Collections.singletonMap("ns1", "/testdir8")); + addMountTable(mountTable3); + + // use router client to create new files + DFSClient routerClient = routerContext.getClient(); + routerClient.create("/getquota/file", true).close(); + routerClient.create("/getquota/subdir1/file", true).close(); + routerClient.create("/getquota/subdir2/file", true).close(); + + ClientProtocol clientProtocol = routerContext.getClient().getNamenode(); + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota"); + // the quota should be aggregated + assertEquals(6, quota.getFileAndDirectoryCount()); + } + + @Test + public void testStaleQuotaRemoving() throws Exception { + long nsQuota = 20; + long ssQuota = 200; + String stalePath = "/stalequota"; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + + // Add one mount tables: + // /stalequota --> ns0---/testdir9 + nnFs1.mkdirs(new Path("/testdir9")); + MountTable mountTable = MountTable.newInstance(stalePath, + Collections.singletonMap("ns0", "/testdir9")); + mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + + // Call periodicInvoke to ensure quota for stalePath was + // loaded into quota manager. + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + + // use quota manager to get its quota usage and do verification + RouterQuotaManager quotaManager = routerContext.getRouter() + .getQuotaManager(); + RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath); + assertEquals(nsQuota, quota.getQuota()); + assertEquals(ssQuota, quota.getSpaceQuota()); + + // remove stale path entry + removeMountTable(stalePath); + updateService.periodicInvoke(); + // the stale entry should be removed and we will get null + quota = quotaManager.getQuotaUsage(stalePath); + assertNull(quota); + } + + /** + * Remove a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to remove. + * @return If it was successfully removed. + * @throws IOException Problems removing entries. + */ + private boolean removeMountTable(String path) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest + .newInstance(path); + RemoveMountTableEntryResponse removeResponse = mountTableManager + .removeMountTableEntry(removeRequest); + + // Reload the Router cache + resolver.loadCache(true); + return removeResponse.getStatus(); + } + + @Test + public void testQuotaUpdating() throws Exception { + long nsQuota = 30; + long ssQuota = 1024; + String path = "/updatequota"; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + + // Add one mount table: + // /updatequota --> ns0---/testdir10 + nnFs1.mkdirs(new Path("/testdir10")); + MountTable mountTable = MountTable.newInstance(path, + Collections.singletonMap("ns0", "/testdir10")); + mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + + // Call periodicInvoke to ensure quota updated in quota manager + // and state store. + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + + // verify initial quota value + List<MountTable> results = getMountTable(path); + MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null; + RouterQuotaUsage quota = updatedMountTable.getQuota(); + assertEquals(nsQuota, quota.getQuota()); + assertEquals(ssQuota, quota.getSpaceQuota()); + assertEquals(1, quota.getFileAndDirectoryCount()); + assertEquals(0, quota.getSpaceConsumed()); + + // mkdir and write a new file + final FileSystem routerFs = routerContext.getFileSystem(); + routerFs.mkdirs(new Path(path + "/" + UUID.randomUUID())); + DFSClient routerClient = routerContext.getClient(); + routerClient.create(path + "/file", true).close(); + appendData(path + "/file", routerClient, BLOCK_SIZE); + + updateService.periodicInvoke(); + results = getMountTable(path); + updatedMountTable = !results.isEmpty() ? results.get(0) : null; + quota = updatedMountTable.getQuota(); + + // verify if quota has been updated in state store + assertEquals(nsQuota, quota.getQuota()); + assertEquals(ssQuota, quota.getSpaceQuota()); + assertEquals(3, quota.getFileAndDirectoryCount()); + assertEquals(BLOCK_SIZE, quota.getSpaceConsumed()); + } + + /** + * Get the mount table entries of specified path through the admin API. + * @param path Mount table entry to get. + * @return If it was successfully got. + * @throws IOException Problems getting entries. + */ + private List<MountTable> getMountTable(String path) throws IOException { + // Reload the Router cache + resolver.loadCache(true); + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(path); + GetMountTableEntriesResponse removeResponse = mountTableManager + .getMountTableEntries(getRequest); + + return removeResponse.getEntries(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java new file mode 100644 index 0000000..ce3ee17 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Set; + +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for class {@link RouterQuotaManager}. + */ +public class TestRouterQuotaManager { + private static RouterQuotaManager manager; + + @Before + public void setup() { + manager = new RouterQuotaManager(); + } + + @After + public void cleanup() { + manager.clear(); + } + + @Test + public void testGetChildrenPaths() { + RouterQuotaUsage quotaUsage = new RouterQuotaUsage.Builder().build(); + manager.put("/path1", quotaUsage); + manager.put("/path2", quotaUsage); + manager.put("/path1/subdir", quotaUsage); + manager.put("/path1/subdir/subdir", quotaUsage); + + Set<String> childrenPaths = manager.getPaths("/path1"); + assertEquals(3, childrenPaths.size()); + assertTrue(childrenPaths.contains("/path1/subdir") + && childrenPaths.contains("/path1/subdir/subdir") + && childrenPaths.contains("/path1")); + + // test for corner case + manager.put("/path3", quotaUsage); + manager.put("/path3/subdir", quotaUsage); + manager.put("/path3-subdir", quotaUsage); + + childrenPaths = manager.getPaths("/path3"); + assertEquals(2, childrenPaths.size()); + // path /path3-subdir should not be returned + assertTrue(childrenPaths.contains("/path3") + && childrenPaths.contains("/path3/subdir") + && !childrenPaths.contains("/path3-subdir")); + } + + @Test + public void testGetQuotaUsage() { + RouterQuotaUsage quotaGet; + + // test case1: get quota with an non-exist path + quotaGet = manager.getQuotaUsage("/non-exist-path"); + assertNull(quotaGet); + + // test case2: get quota from an no-quota set path + RouterQuotaUsage.Builder quota = new RouterQuotaUsage.Builder() + .quota(HdfsConstants.QUOTA_DONT_SET) + .spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/noQuotaSet", quota.build()); + quotaGet = manager.getQuotaUsage("/noQuotaSet"); + // it should return null + assertNull(quotaGet); + + // test case3: get quota from an quota-set path + quota.quota(1); + quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/hasQuotaSet", quota.build()); + quotaGet = manager.getQuotaUsage("/hasQuotaSet"); + assertEquals(1, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + + // test case4: get quota with an non-exist child path + quotaGet = manager.getQuotaUsage("/hasQuotaSet/file"); + // it will return the nearest ancestor which quota was set + assertEquals(1, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + + // test case5: get quota with an child path which its parent + // wasn't quota set + quota.quota(HdfsConstants.QUOTA_DONT_SET); + quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/hasQuotaSet/noQuotaSet", quota.build()); + // here should returns the quota of path /hasQuotaSet + // (the nearest ancestor which quota was set) + quotaGet = manager.getQuotaUsage("/hasQuotaSet/noQuotaSet/file"); + assertEquals(1, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + + // test case6: get quota with an child path which its parent was quota set + quota.quota(2); + quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/hasQuotaSet/hasQuotaSet", quota.build()); + // here should return the quota of path /hasQuotaSet/hasQuotaSet + quotaGet = manager.getQuotaUsage("/hasQuotaSet/hasQuotaSet/file"); + assertEquals(2, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java new file mode 100644 index 0000000..61e7657 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test retry behavior of the Router RPC Client. + */ +public class TestRouterRPCClientRetries { + + private static StateStoreDFSCluster cluster; + private static NamenodeContext nnContext1; + private static RouterContext routerContext; + private static MembershipNamenodeResolver resolver; + private static ClientProtocol routerProtocol; + + @Before + public void setUp() throws Exception { + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + + // reduce IPC client connection retry times and interval time + Configuration clientConf = new Configuration(false); + clientConf.setInt( + CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + clientConf.setInt( + CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100); + + cluster.addRouterOverrides(routerConf); + // override some settings for the client + cluster.startCluster(clientConf); + cluster.startRouters(); + cluster.waitClusterUp(); + + nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null); + routerContext = cluster.getRandomRouter(); + resolver = (MembershipNamenodeResolver) routerContext.getRouter() + .getNamenodeResolver(); + routerProtocol = routerContext.getClient().getNamenode(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testRetryWhenAllNameServiceDown() throws Exception { + // shutdown the dfs cluster + MiniDFSCluster dfsCluster = cluster.getCluster(); + dfsCluster.shutdown(); + + // register an invalid namenode report + registerInvalidNameReport(); + + // Create a directory via the router + String dirPath = "/testRetryWhenClusterisDown"; + FsPermission permission = new FsPermission("705"); + try { + routerProtocol.mkdirs(dirPath, permission, false); + fail("Should have thrown RemoteException error."); + } catch (RemoteException e) { + String ns0 = cluster.getNameservices().get(0); + GenericTestUtils.assertExceptionContains( + "No namenode available under nameservice " + ns0, e); + } + + // Verify the retry times, it should only retry one time. + FederationRPCMetrics rpcMetrics = routerContext.getRouter() + .getRpcServer().getRPCMetrics(); + assertEquals(1, rpcMetrics.getProxyOpRetries()); + } + + @Test + public void testRetryWhenOneNameServiceDown() throws Exception { + // shutdown the dfs cluster + MiniDFSCluster dfsCluster = cluster.getCluster(); + dfsCluster.shutdownNameNode(0); + + // register an invalid namenode report + registerInvalidNameReport(); + + DFSClient client = nnContext1.getClient(); + // Renew lease for the DFS client, it will succeed. + routerProtocol.renewLease(client.getClientName()); + + // Verify the retry times, it will retry one time for ns0. + FederationRPCMetrics rpcMetrics = routerContext.getRouter() + .getRpcServer().getRPCMetrics(); + assertEquals(1, rpcMetrics.getProxyOpRetries()); + } + + /** + * Register an invalid namenode report. + * @throws IOException + */ + private void registerInvalidNameReport() throws IOException { + String ns0 = cluster.getNameservices().get(0); + List<? extends FederationNamenodeContext> origin = resolver + .getNamenodesForNameserviceId(ns0); + FederationNamenodeContext nnInfo = origin.get(0); + NamenodeStatusReport report = new NamenodeStatusReport(ns0, + nnInfo.getNamenodeId(), nnInfo.getRpcAddress(), + nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(), + nnInfo.getWebAddress()); + report.setRegistrationValid(false); + assertTrue(resolver.registerNamenode(report)); + resolver.loadCache(true); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org