http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java deleted file mode 100644 index 5489691..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.federation.MockResolver; -import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; -import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; -import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; - -/** - * The the RPC interface of the {@link getRouter()} implemented by - * {@link RouterRpcServer}. - */ -public class TestRouterRpcMultiDestination extends TestRouterRpc { - - @Override - public void testSetup() throws Exception { - - RouterDFSCluster cluster = getCluster(); - - // Create mock locations - getCluster().installMockLocations(); - List<RouterContext> routers = cluster.getRouters(); - - // Add extra location to the root mount / such that the root mount points: - // / - // ns0 -> / - // ns1 -> / - for (RouterContext rc : routers) { - Router router = rc.getRouter(); - MockResolver resolver = (MockResolver) router.getSubclusterResolver(); - resolver.addLocation("/", cluster.getNameservices().get(1), "/"); - } - - // Create a mount that points to 2 dirs in the same ns: - // /same - // ns0 -> / - // ns0 -> /target-ns0 - for (RouterContext rc : routers) { - Router router = rc.getRouter(); - MockResolver resolver = (MockResolver) router.getSubclusterResolver(); - List<String> nss = cluster.getNameservices(); - String ns0 = nss.get(0); - resolver.addLocation("/same", ns0, "/"); - resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0)); - } - - // Delete all files via the NNs and verify - cluster.deleteAllFiles(); - - // Create test fixtures on NN - cluster.createTestDirectoriesNamenode(); - - // Wait to ensure NN has fully created its test directories - Thread.sleep(100); - - // Pick a NS, namenode and getRouter() for this test - RouterContext router = cluster.getRandomRouter(); - this.setRouter(router); - - String ns = cluster.getRandomNameservice(); - this.setNs(ns); - this.setNamenode(cluster.getNamenode(ns, null)); - - // Create a test file on a single NN that is accessed via a getRouter() path - // with 2 destinations. All tests should failover to the alternate - // destination if the wrong NN is attempted first. - Random r = new Random(); - String randomString = "testfile-" + r.nextInt(); - setNamenodeFile("/" + randomString); - setRouterFile("/" + randomString); - - FileSystem nnFs = getNamenodeFileSystem(); - FileSystem routerFs = getRouterFileSystem(); - createFile(nnFs, getNamenodeFile(), 32); - - verifyFileExists(nnFs, getNamenodeFile()); - verifyFileExists(routerFs, getRouterFile()); - } - - private void testListing(String path) throws IOException { - - // Collect the mount table entries for this path - Set<String> requiredPaths = new TreeSet<>(); - RouterContext rc = getRouterContext(); - Router router = rc.getRouter(); - FileSubclusterResolver subclusterResolver = router.getSubclusterResolver(); - for (String mount : subclusterResolver.getMountPoints(path)) { - requiredPaths.add(mount); - } - - // Get files/dirs from the Namenodes - PathLocation location = subclusterResolver.getDestinationForPath(path); - for (RemoteLocation loc : location.getDestinations()) { - String nsId = loc.getNameserviceId(); - String dest = loc.getDest(); - NamenodeContext nn = getCluster().getNamenode(nsId, null); - FileSystem fs = nn.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path(dest)); - for (FileStatus file : files) { - String pathName = file.getPath().getName(); - requiredPaths.add(pathName); - } - } - - // Get files/dirs from the Router - DirectoryListing listing = - getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false); - Iterator<String> requiredPathsIterator = requiredPaths.iterator(); - - // Match each path returned and verify order returned - HdfsFileStatus[] partialListing = listing.getPartialListing(); - for (HdfsFileStatus fileStatus : listing.getPartialListing()) { - String fileName = requiredPathsIterator.next(); - String currentFile = fileStatus.getFullPath(new Path(path)).getName(); - assertEquals(currentFile, fileName); - } - - // Verify the total number of results found/matched - assertEquals( - requiredPaths + " doesn't match " + Arrays.toString(partialListing), - requiredPaths.size(), partialListing.length); - } - - @Override - public void testProxyListFiles() throws IOException, InterruptedException, - URISyntaxException, NoSuchMethodException, SecurityException { - - // Verify that the root listing is a union of the mount table destinations - // and the files stored at all nameservices mounted at the root (ns0 + ns1) - // / --> - // /ns0 (from mount table) - // /ns1 (from mount table) - // /same (from the mount table) - // all items in / of ns0 from mapping of / -> ns0:::/) - // all items in / of ns1 from mapping of / -> ns1:::/) - testListing("/"); - - // Verify that the "/same" mount point lists the contents of both dirs in - // the same ns - // /same --> - // /target-ns0 (from root of ns0) - // /testdir (from contents of /target-ns0) - testListing("/same"); - - // List a non-existing path and validate error response with NN behavior - ClientProtocol namenodeProtocol = - getCluster().getRandomNamenode().getClient().getNamenode(); - Method m = ClientProtocol.class.getMethod( - "getListing", String.class, byte[].class, boolean.class); - String badPath = "/unknownlocation/unknowndir"; - compareResponses(getRouterProtocol(), namenodeProtocol, m, - new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); - } - - @Override - public void testProxyRenameFiles() throws IOException, InterruptedException { - - super.testProxyRenameFiles(); - - List<String> nss = getCluster().getNameservices(); - String ns0 = nss.get(0); - String ns1 = nss.get(1); - - // Rename a file from ns0 into the root (mapped to both ns0 and ns1) - String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0); - String filename0 = testDir0 + "/testrename"; - String renamedFile = "/testrename"; - testRename(getRouterContext(), filename0, renamedFile, false); - testRename2(getRouterContext(), filename0, renamedFile, false); - - // Rename a file from ns1 into the root (mapped to both ns0 and ns1) - String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1); - String filename1 = testDir1 + "/testrename"; - testRename(getRouterContext(), filename1, renamedFile, false); - testRename2(getRouterContext(), filename1, renamedFile, false); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java deleted file mode 100644 index e05f727..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.deleteStateStore; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; -import org.apache.hadoop.service.Service.STATE; -import org.apache.hadoop.util.Time; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Test the safe mode for the {@link Router} controlled by - * {@link RouterSafemodeService}. - */ -public class TestRouterSafemode { - - private Router router; - private static Configuration conf; - - @BeforeClass - public static void create() throws IOException { - // Wipe state store - deleteStateStore(); - // Configuration that supports the state store - conf = getStateStoreConfiguration(); - // 2 sec startup standby - conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, - TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); - // 1 sec cache refresh - conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); - // 2 sec post cache update before entering safemode (2 intervals) - conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, - TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); - - conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); - conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); - conf.set(DFSConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFSConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0"); - - // RPC + State Store + Safe Mode only - conf = new RouterConfigBuilder(conf) - .rpc() - .safemode() - .stateStore() - .metrics() - .build(); - } - - @AfterClass - public static void destroy() { - } - - @Before - public void setup() throws IOException, URISyntaxException { - router = new Router(); - router.init(conf); - router.start(); - } - - @After - public void cleanup() throws IOException { - if (router != null) { - router.stop(); - router = null; - } - } - - @Test - public void testSafemodeService() throws IOException { - RouterSafemodeService server = new RouterSafemodeService(router); - server.init(conf); - assertEquals(STATE.INITED, server.getServiceState()); - server.start(); - assertEquals(STATE.STARTED, server.getServiceState()); - server.stop(); - assertEquals(STATE.STOPPED, server.getServiceState()); - server.close(); - } - - @Test - public void testRouterExitSafemode() - throws InterruptedException, IllegalStateException, IOException { - - assertTrue(router.getRpcServer().isInSafeMode()); - verifyRouter(RouterServiceState.SAFEMODE); - - // Wait for initial time in milliseconds - long interval = - conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, - TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + - conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); - Thread.sleep(interval); - - assertFalse(router.getRpcServer().isInSafeMode()); - verifyRouter(RouterServiceState.RUNNING); - } - - @Test - public void testRouterEnterSafemode() - throws IllegalStateException, IOException, InterruptedException { - - // Verify starting state - assertTrue(router.getRpcServer().isInSafeMode()); - verifyRouter(RouterServiceState.SAFEMODE); - - // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time - long interval0 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, - TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) - 1000; - long t0 = Time.now(); - while (Time.now() - t0 < interval0) { - verifyRouter(RouterServiceState.SAFEMODE); - Thread.sleep(100); - } - - // We wait some time for the state to propagate - long interval1 = 1000 + 2 * conf.getTimeDuration( - DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1), - TimeUnit.MILLISECONDS); - Thread.sleep(interval1); - - // Running - assertFalse(router.getRpcServer().isInSafeMode()); - verifyRouter(RouterServiceState.RUNNING); - - // Disable cache - router.getStateStore().stopCacheUpdateService(); - - // Wait until the State Store cache is stale in milliseconds - long interval2 = - conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, - TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + - conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); - Thread.sleep(interval2); - - // Safemode - assertTrue(router.getRpcServer().isInSafeMode()); - verifyRouter(RouterServiceState.SAFEMODE); - } - - @Test - public void testRouterRpcSafeMode() - throws IllegalStateException, IOException { - - assertTrue(router.getRpcServer().isInSafeMode()); - verifyRouter(RouterServiceState.SAFEMODE); - - // If the Router is in Safe Mode, we should get a SafeModeException - boolean exception = false; - try { - router.getRpcServer().delete("/testfile.txt", true); - fail("We should have thrown a safe mode exception"); - } catch (RouterSafeModeException sme) { - exception = true; - } - assertTrue("We should have thrown a safe mode exception", exception); - } - - private void verifyRouter(RouterServiceState status) - throws IllegalStateException, IOException { - assertEquals(status, router.getRouterState()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java deleted file mode 100644 index def3935..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS; -import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY; -import static org.junit.Assert.assertNotNull; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; -import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.util.Time; - -/** - * Utilities to test the State Store. - */ -public final class FederationStateStoreTestUtils { - - /** The State Store Driver implementation class for testing .*/ - private static final Class<? extends StateStoreDriver> - FEDERATION_STORE_DRIVER_CLASS_FOR_TEST = StateStoreFileImpl.class; - - private FederationStateStoreTestUtils() { - // Utility Class - } - - /** - * Get the State Store driver implementation for testing. - * - * @return Class of the State Store driver implementation. - */ - public static Class<? extends StateStoreDriver> getTestDriverClass() { - return FEDERATION_STORE_DRIVER_CLASS_FOR_TEST; - } - - /** - * Create a default State Store configuration. - * - * @return State Store configuration. - */ - public static Configuration getStateStoreConfiguration() { - Class<? extends StateStoreDriver> clazz = getTestDriverClass(); - return getStateStoreConfiguration(clazz); - } - - /** - * Create a new State Store configuration for a particular driver. - * - * @param clazz Class of the driver to create. - * @return State Store configuration. - */ - public static Configuration getStateStoreConfiguration( - Class<? extends StateStoreDriver> clazz) { - Configuration conf = new HdfsConfiguration(false); - - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test"); - - conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class); - - if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) { - setFileConfiguration(conf); - } - return conf; - } - - /** - * Create a new State Store based on a configuration. - * - * @param configuration Configuration for the State Store. - * @return New State Store service. - * @throws IOException If it cannot create the State Store. - * @throws InterruptedException If we cannot wait for the store to start. - */ - public static StateStoreService newStateStore( - Configuration configuration) throws IOException, InterruptedException { - - StateStoreService stateStore = new StateStoreService(); - assertNotNull(stateStore); - - // Set unique identifier, this is normally the router address - String identifier = UUID.randomUUID().toString(); - stateStore.setIdentifier(identifier); - - stateStore.init(configuration); - stateStore.start(); - - // Wait for state store to connect - waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); - - return stateStore; - } - - /** - * Wait for the State Store to initialize its driver. - * - * @param stateStore State Store. - * @param timeoutMs Time out in milliseconds. - * @throws IOException If the State Store cannot be reached. - * @throws InterruptedException If the sleep is interrupted. - */ - public static void waitStateStore(StateStoreService stateStore, - long timeoutMs) throws IOException, InterruptedException { - long startingTime = Time.monotonicNow(); - while (!stateStore.isDriverReady()) { - Thread.sleep(100); - if (Time.monotonicNow() - startingTime > timeoutMs) { - throw new IOException("Timeout waiting for State Store to connect"); - } - } - } - - /** - * Delete the default State Store. - * - * @throws IOException - */ - public static void deleteStateStore() throws IOException { - Class<? extends StateStoreDriver> driverClass = getTestDriverClass(); - deleteStateStore(driverClass); - } - - /** - * Delete the State Store. - * @param driverClass Class of the State Store driver implementation. - * @throws IOException If it cannot be deleted. - */ - public static void deleteStateStore( - Class<? extends StateStoreDriver> driverClass) throws IOException { - - if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) { - String workingDirectory = System.getProperty("user.dir"); - File dir = new File(workingDirectory + "/statestore"); - if (dir.exists()) { - FileUtils.cleanDirectory(dir); - } - } - } - - /** - * Set the default configuration for drivers based on files. - * - * @param conf Configuration to extend. - */ - public static void setFileConfiguration(Configuration conf) { - String workingPath = System.getProperty("user.dir"); - String stateStorePath = workingPath + "/statestore"; - conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath); - } - - /** - * Clear all the records from the State Store. - * - * @param store State Store to remove records from. - * @return If the State Store was cleared. - * @throws IOException If it cannot clear the State Store. - */ - public static boolean clearAllRecords(StateStoreService store) - throws IOException { - Collection<Class<? extends BaseRecord>> allRecords = - store.getSupportedRecords(); - for (Class<? extends BaseRecord> recordType : allRecords) { - if (!clearRecords(store, recordType)) { - return false; - } - } - return true; - } - - /** - * Clear records from a certain type from the State Store. - * - * @param store State Store to remove records from. - * @param recordClass Class of the records to remove. - * @return If the State Store was cleared. - * @throws IOException If it cannot clear the State Store. - */ - public static <T extends BaseRecord> boolean clearRecords( - StateStoreService store, Class<T> recordClass) throws IOException { - List<T> emptyList = new ArrayList<>(); - if (!synchronizeRecords(store, emptyList, recordClass)) { - return false; - } - store.refreshCaches(true); - return true; - } - - /** - * Synchronize a set of records. Remove all and keep the ones specified. - * - * @param stateStore State Store service managing the driver. - * @param records Records to add. - * @param clazz Class of the record to synchronize. - * @return If the synchronization succeeded. - * @throws IOException If it cannot connect to the State Store. - */ - public static <T extends BaseRecord> boolean synchronizeRecords( - StateStoreService stateStore, List<T> records, Class<T> clazz) - throws IOException { - StateStoreDriver driver = stateStore.getDriver(); - driver.verifyDriverReady(); - if (driver.removeAll(clazz)) { - if (driver.putAll(records, true, false)) { - return true; - } - } - return false; - } - - public static List<MountTable> createMockMountTable( - List<String> nameservices) throws IOException { - // create table entries - List<MountTable> entries = new ArrayList<>(); - for (String ns : nameservices) { - Map<String, String> destMap = new HashMap<>(); - destMap.put(ns, "/target-" + ns); - MountTable entry = MountTable.newInstance("/" + ns, destMap); - entries.add(entry); - } - return entries; - } - - public static MembershipState createMockRegistrationForNamenode( - String nameserviceId, String namenodeId, - FederationNamenodeServiceState state) throws IOException { - MembershipState entry = MembershipState.newInstance( - "routerId", nameserviceId, namenodeId, "clusterId", "test", - "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", state, false); - MembershipStats stats = MembershipStats.newInstance(); - stats.setNumOfActiveDatanodes(100); - stats.setNumOfDeadDatanodes(10); - stats.setNumOfDecommissioningDatanodes(20); - stats.setNumOfDecomActiveDatanodes(15); - stats.setNumOfDecomDeadDatanodes(5); - stats.setNumOfBlocks(10); - entry.setStats(stats); - return entry; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java deleted file mode 100644 index 7f6704e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; -import static org.junit.Assert.assertNotNull; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -/** - * Test the basic {@link StateStoreService} {@link MountTableStore} - * functionality. - */ -public class TestStateStoreBase { - - private static StateStoreService stateStore; - private static Configuration conf; - - protected static StateStoreService getStateStore() { - return stateStore; - } - - protected static Configuration getConf() { - return conf; - } - - @BeforeClass - public static void createBase() throws IOException, InterruptedException { - - conf = getStateStoreConfiguration(); - - // Disable auto-reconnect to data store - conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, - TimeUnit.HOURS.toMillis(1)); - } - - @AfterClass - public static void destroyBase() throws Exception { - if (stateStore != null) { - stateStore.stop(); - stateStore.close(); - stateStore = null; - } - } - - @Before - public void setupBase() throws IOException, InterruptedException, - InstantiationException, IllegalAccessException { - if (stateStore == null) { - stateStore = newStateStore(conf); - assertNotNull(stateStore); - } - // Wait for state store to connect - stateStore.loadDriver(); - waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java deleted file mode 100644 index 26f081b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java +++ /dev/null @@ -1,463 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS; -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; -import org.apache.hadoop.util.Time; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Test the basic {@link MembershipStore} membership functionality. - */ -public class TestStateStoreMembershipState extends TestStateStoreBase { - - private static MembershipStore membershipStore; - - @BeforeClass - public static void create() { - // Reduce expirations to 5 seconds - getConf().setLong( - DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, - TimeUnit.SECONDS.toMillis(5)); - } - - @Before - public void setup() throws IOException, InterruptedException { - - membershipStore = - getStateStore().getRegisteredRecordStore(MembershipStore.class); - - // Clear NN registrations - assertTrue(clearRecords(getStateStore(), MembershipState.class)); - } - - @Test - public void testNamenodeStateOverride() throws Exception { - // Populate the state store - // 1) ns0:nn0 - Standby - String ns = "ns0"; - String nn = "nn0"; - MembershipState report = createRegistration( - ns, nn, ROUTERS[1], FederationNamenodeServiceState.STANDBY); - assertTrue(namenodeHeartbeat(report)); - - // Load data into cache and calculate quorum - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - MembershipState existingState = getNamenodeRegistration(ns, nn); - assertEquals( - FederationNamenodeServiceState.STANDBY, existingState.getState()); - - // Override cache - UpdateNamenodeRegistrationRequest request = - UpdateNamenodeRegistrationRequest.newInstance( - ns, nn, FederationNamenodeServiceState.ACTIVE); - assertTrue(membershipStore.updateNamenodeRegistration(request).getResult()); - - MembershipState newState = getNamenodeRegistration(ns, nn); - assertEquals(FederationNamenodeServiceState.ACTIVE, newState.getState()); - } - - @Test - public void testStateStoreDisconnected() throws Exception { - - // Close the data store driver - getStateStore().closeDriver(); - assertFalse(getStateStore().isDriverReady()); - - NamenodeHeartbeatRequest hbRequest = NamenodeHeartbeatRequest.newInstance(); - hbRequest.setNamenodeMembership( - createMockRegistrationForNamenode( - "test", "test", FederationNamenodeServiceState.UNAVAILABLE)); - verifyException(membershipStore, "namenodeHeartbeat", - StateStoreUnavailableException.class, - new Class[] {NamenodeHeartbeatRequest.class}, - new Object[] {hbRequest }); - - // Information from cache, no exception should be triggered for these - // TODO - should cached info expire at some point? - GetNamenodeRegistrationsRequest getRequest = - GetNamenodeRegistrationsRequest.newInstance(); - verifyException(membershipStore, - "getNamenodeRegistrations", null, - new Class[] {GetNamenodeRegistrationsRequest.class}, - new Object[] {getRequest}); - - verifyException(membershipStore, - "getExpiredNamenodeRegistrations", null, - new Class[] {GetNamenodeRegistrationsRequest.class}, - new Object[] {getRequest}); - - UpdateNamenodeRegistrationRequest overrideRequest = - UpdateNamenodeRegistrationRequest.newInstance(); - verifyException(membershipStore, - "updateNamenodeRegistration", null, - new Class[] {UpdateNamenodeRegistrationRequest.class}, - new Object[] {overrideRequest}); - } - - private void registerAndLoadRegistrations( - List<MembershipState> registrationList) throws IOException { - // Populate - assertTrue(synchronizeRecords( - getStateStore(), registrationList, MembershipState.class)); - - // Load into cache - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - } - - private MembershipState createRegistration(String ns, String nn, - String router, FederationNamenodeServiceState state) throws IOException { - MembershipState record = MembershipState.newInstance( - router, ns, - nn, "testcluster", "testblock-" + ns, "testrpc-"+ ns + nn, - "testservice-"+ ns + nn, "testlifeline-"+ ns + nn, - "testweb-" + ns + nn, state, false); - return record; - } - - @Test - public void testRegistrationMajorityQuorum() - throws InterruptedException, IOException { - - // Populate the state store with a set of non-matching elements - // 1) ns0:nn0 - Standby (newest) - // 2) ns0:nn0 - Active (oldest) - // 3) ns0:nn0 - Active (2nd oldest) - // 4) ns0:nn0 - Active (3nd oldest element, newest active element) - // Verify the selected entry is the newest majority opinion (4) - String ns = "ns0"; - String nn = "nn0"; - - // Active - oldest - MembershipState report = createRegistration( - ns, nn, ROUTERS[1], FederationNamenodeServiceState.ACTIVE); - assertTrue(namenodeHeartbeat(report)); - Thread.sleep(1000); - - // Active - 2nd oldest - report = createRegistration( - ns, nn, ROUTERS[2], FederationNamenodeServiceState.ACTIVE); - assertTrue(namenodeHeartbeat(report)); - Thread.sleep(1000); - - // Active - 3rd oldest, newest active element - report = createRegistration( - ns, nn, ROUTERS[3], FederationNamenodeServiceState.ACTIVE); - assertTrue(namenodeHeartbeat(report)); - - // standby - newest overall - report = createRegistration( - ns, nn, ROUTERS[0], FederationNamenodeServiceState.STANDBY); - assertTrue(namenodeHeartbeat(report)); - - // Load and calculate quorum - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - // Verify quorum entry - MembershipState quorumEntry = getNamenodeRegistration( - report.getNameserviceId(), report.getNamenodeId()); - assertNotNull(quorumEntry); - assertEquals(quorumEntry.getRouterId(), ROUTERS[3]); - } - - @Test - public void testRegistrationQuorumExcludesExpired() - throws InterruptedException, IOException { - - // Populate the state store with some expired entries and verify the expired - // entries are ignored. - // 1) ns0:nn0 - Active - // 2) ns0:nn0 - Expired - // 3) ns0:nn0 - Expired - // 4) ns0:nn0 - Expired - // Verify the selected entry is the active entry - List<MembershipState> registrationList = new ArrayList<>(); - String ns = "ns0"; - String nn = "nn0"; - String rpcAddress = "testrpcaddress"; - String serviceAddress = "testserviceaddress"; - String lifelineAddress = "testlifelineaddress"; - String blockPoolId = "testblockpool"; - String clusterId = "testcluster"; - String webAddress = "testwebaddress"; - boolean safemode = false; - - // Active - MembershipState record = MembershipState.newInstance( - ROUTERS[0], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.ACTIVE, safemode); - registrationList.add(record); - - // Expired - record = MembershipState.newInstance( - ROUTERS[1], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - registrationList.add(record); - - // Expired - record = MembershipState.newInstance( - ROUTERS[2], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - registrationList.add(record); - - // Expired - record = MembershipState.newInstance( - ROUTERS[3], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - registrationList.add(record); - registerAndLoadRegistrations(registrationList); - - // Verify quorum entry chooses active membership - MembershipState quorumEntry = getNamenodeRegistration( - record.getNameserviceId(), record.getNamenodeId()); - assertNotNull(quorumEntry); - assertEquals(ROUTERS[0], quorumEntry.getRouterId()); - } - - @Test - public void testRegistrationQuorumAllExpired() throws IOException { - - // 1) ns0:nn0 - Expired (oldest) - // 2) ns0:nn0 - Expired - // 3) ns0:nn0 - Expired - // 4) ns0:nn0 - Expired - // Verify no entry is either selected or cached - List<MembershipState> registrationList = new ArrayList<>(); - String ns = NAMESERVICES[0]; - String nn = NAMENODES[0]; - String rpcAddress = "testrpcaddress"; - String serviceAddress = "testserviceaddress"; - String lifelineAddress = "testlifelineaddress"; - String blockPoolId = "testblockpool"; - String clusterId = "testcluster"; - String webAddress = "testwebaddress"; - boolean safemode = false; - long startingTime = Time.now(); - - // Expired - MembershipState record = MembershipState.newInstance( - ROUTERS[0], ns, nn, clusterId, blockPoolId, - rpcAddress, webAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - record.setDateModified(startingTime - 10000); - registrationList.add(record); - - // Expired - record = MembershipState.newInstance( - ROUTERS[1], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - record.setDateModified(startingTime); - registrationList.add(record); - - // Expired - record = MembershipState.newInstance( - ROUTERS[2], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - record.setDateModified(startingTime); - registrationList.add(record); - - // Expired - record = MembershipState.newInstance( - ROUTERS[3], ns, nn, clusterId, blockPoolId, - rpcAddress, serviceAddress, lifelineAddress, webAddress, - FederationNamenodeServiceState.EXPIRED, safemode); - record.setDateModified(startingTime); - registrationList.add(record); - - registerAndLoadRegistrations(registrationList); - - // Verify no entry is found for this nameservice - assertNull(getNamenodeRegistration( - record.getNameserviceId(), record.getNamenodeId())); - } - - @Test - public void testRegistrationNoQuorum() - throws InterruptedException, IOException { - - // Populate the state store with a set of non-matching elements - // 1) ns0:nn0 - Standby (newest) - // 2) ns0:nn0 - Standby (oldest) - // 3) ns0:nn0 - Active (2nd oldest) - // 4) ns0:nn0 - Active (3nd oldest element, newest active element) - // Verify the selected entry is the newest entry (1) - MembershipState report1 = createRegistration( - NAMESERVICES[0], NAMENODES[0], ROUTERS[1], - FederationNamenodeServiceState.STANDBY); - assertTrue(namenodeHeartbeat(report1)); - Thread.sleep(100); - MembershipState report2 = createRegistration( - NAMESERVICES[0], NAMENODES[0], ROUTERS[2], - FederationNamenodeServiceState.ACTIVE); - assertTrue(namenodeHeartbeat(report2)); - Thread.sleep(100); - MembershipState report3 = createRegistration( - NAMESERVICES[0], NAMENODES[0], ROUTERS[3], - FederationNamenodeServiceState.ACTIVE); - assertTrue(namenodeHeartbeat(report3)); - Thread.sleep(100); - MembershipState report4 = createRegistration( - NAMESERVICES[0], NAMENODES[0], ROUTERS[0], - FederationNamenodeServiceState.STANDBY); - assertTrue(namenodeHeartbeat(report4)); - - // Load and calculate quorum - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - // Verify quorum entry uses the newest data, even though it is standby - MembershipState quorumEntry = getNamenodeRegistration( - report1.getNameserviceId(), report1.getNamenodeId()); - assertNotNull(quorumEntry); - assertEquals(ROUTERS[0], quorumEntry.getRouterId()); - assertEquals( - FederationNamenodeServiceState.STANDBY, quorumEntry.getState()); - } - - @Test - public void testRegistrationExpired() - throws InterruptedException, IOException { - - // Populate the state store with a single NN element - // 1) ns0:nn0 - Active - // Wait for the entry to expire without heartbeating - // Verify the NN entry is populated as EXPIRED internally in the state store - - MembershipState report = createRegistration( - NAMESERVICES[0], NAMENODES[0], ROUTERS[0], - FederationNamenodeServiceState.ACTIVE); - assertTrue(namenodeHeartbeat(report)); - - // Load cache - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - // Verify quorum and entry - MembershipState quorumEntry = getNamenodeRegistration( - report.getNameserviceId(), report.getNamenodeId()); - assertNotNull(quorumEntry); - assertEquals(ROUTERS[0], quorumEntry.getRouterId()); - assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); - - // Wait past expiration (set in conf to 5 seconds) - Thread.sleep(6000); - // Reload cache - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - // Verify entry is now expired and is no longer in the cache - quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]); - assertNull(quorumEntry); - - // Verify entry is now expired and can't be used by RPC service - quorumEntry = getNamenodeRegistration( - report.getNameserviceId(), report.getNamenodeId()); - assertNull(quorumEntry); - - // Heartbeat again, updates dateModified - assertTrue(namenodeHeartbeat(report)); - // Reload cache - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - // Verify updated entry marked as active and is accessible to RPC server - quorumEntry = getNamenodeRegistration( - report.getNameserviceId(), report.getNamenodeId()); - assertNotNull(quorumEntry); - assertEquals(ROUTERS[0], quorumEntry.getRouterId()); - assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); - } - - /** - * Get a single namenode membership record from the store. - * - * @param nsId The HDFS nameservice ID to search for - * @param nnId The HDFS namenode ID to search for - * @return The single NamenodeMembershipRecord that matches the query or null - * if not found. - * @throws IOException if the query could not be executed. - */ - private MembershipState getNamenodeRegistration( - final String nsId, final String nnId) throws IOException { - - MembershipState partial = MembershipState.newInstance(); - partial.setNameserviceId(nsId); - partial.setNamenodeId(nnId); - GetNamenodeRegistrationsRequest request = - GetNamenodeRegistrationsRequest.newInstance(partial); - GetNamenodeRegistrationsResponse response = - membershipStore.getNamenodeRegistrations(request); - - List<MembershipState> results = response.getNamenodeMemberships(); - if (results != null && results.size() == 1) { - MembershipState record = results.get(0); - return record; - } - return null; - } - - /** - * Register a namenode heartbeat with the state store. - * - * @param store FederationMembershipStateStore instance to retrieve the - * membership data records. - * @param namenode A fully populated namenode membership record to be - * committed to the data store. - * @return True if successful, false otherwise. - * @throws IOException if the state store query could not be performed. - */ - private boolean namenodeHeartbeat(MembershipState namenode) - throws IOException { - - NamenodeHeartbeatRequest request = - NamenodeHeartbeatRequest.newInstance(namenode); - NamenodeHeartbeatResponse response = - membershipStore.namenodeHeartbeat(request); - return response.getResult(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java deleted file mode 100644 index d30d6ba..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Test the basic {@link StateStoreService} - * {@link MountTableStore} functionality. - */ -public class TestStateStoreMountTable extends TestStateStoreBase { - - private static List<String> nameservices; - private static MountTableStore mountStore; - - @BeforeClass - public static void create() throws IOException { - nameservices = new ArrayList<>(); - nameservices.add(NAMESERVICES[0]); - nameservices.add(NAMESERVICES[1]); - } - - @Before - public void setup() throws IOException, InterruptedException { - mountStore = - getStateStore().getRegisteredRecordStore(MountTableStore.class); - // Clear Mount table registrations - assertTrue(clearRecords(getStateStore(), MountTable.class)); - } - - @Test - public void testStateStoreDisconnected() throws Exception { - - // Close the data store driver - getStateStore().closeDriver(); - assertFalse(getStateStore().isDriverReady()); - - // Test APIs that access the store to check they throw the correct exception - AddMountTableEntryRequest addRequest = - AddMountTableEntryRequest.newInstance(); - verifyException(mountStore, "addMountTableEntry", - StateStoreUnavailableException.class, - new Class[] {AddMountTableEntryRequest.class}, - new Object[] {addRequest}); - - UpdateMountTableEntryRequest updateRequest = - UpdateMountTableEntryRequest.newInstance(); - verifyException(mountStore, "updateMountTableEntry", - StateStoreUnavailableException.class, - new Class[] {UpdateMountTableEntryRequest.class}, - new Object[] {updateRequest}); - - RemoveMountTableEntryRequest removeRequest = - RemoveMountTableEntryRequest.newInstance(); - verifyException(mountStore, "removeMountTableEntry", - StateStoreUnavailableException.class, - new Class[] {RemoveMountTableEntryRequest.class}, - new Object[] {removeRequest}); - - GetMountTableEntriesRequest getRequest = - GetMountTableEntriesRequest.newInstance(); - mountStore.loadCache(true); - verifyException(mountStore, "getMountTableEntries", - StateStoreUnavailableException.class, - new Class[] {GetMountTableEntriesRequest.class}, - new Object[] {getRequest}); - } - - @Test - public void testSynchronizeMountTable() throws IOException { - // Synchronize and get mount table entries - List<MountTable> entries = createMockMountTable(nameservices); - assertTrue(synchronizeRecords(getStateStore(), entries, MountTable.class)); - for (MountTable e : entries) { - mountStore.loadCache(true); - MountTable entry = getMountTableEntry(e.getSourcePath()); - assertNotNull(entry); - assertEquals(e.getDefaultLocation().getDest(), - entry.getDefaultLocation().getDest()); - } - } - - @Test - public void testAddMountTableEntry() throws IOException { - - // Add 1 - List<MountTable> entries = createMockMountTable(nameservices); - List<MountTable> entries1 = getMountTableEntries("/").getRecords(); - assertEquals(0, entries1.size()); - MountTable entry0 = entries.get(0); - AddMountTableEntryRequest request = - AddMountTableEntryRequest.newInstance(entry0); - AddMountTableEntryResponse response = - mountStore.addMountTableEntry(request); - assertTrue(response.getStatus()); - - mountStore.loadCache(true); - List<MountTable> entries2 = getMountTableEntries("/").getRecords(); - assertEquals(1, entries2.size()); - } - - @Test - public void testRemoveMountTableEntry() throws IOException { - - // Add many - List<MountTable> entries = createMockMountTable(nameservices); - synchronizeRecords(getStateStore(), entries, MountTable.class); - mountStore.loadCache(true); - List<MountTable> entries1 = getMountTableEntries("/").getRecords(); - assertEquals(entries.size(), entries1.size()); - - // Remove 1 - RemoveMountTableEntryRequest request = - RemoveMountTableEntryRequest.newInstance(); - request.setSrcPath(entries.get(0).getSourcePath()); - assertTrue(mountStore.removeMountTableEntry(request).getStatus()); - - // Verify remove - mountStore.loadCache(true); - List<MountTable> entries2 = getMountTableEntries("/").getRecords(); - assertEquals(entries.size() - 1, entries2.size()); - } - - @Test - public void testUpdateMountTableEntry() throws IOException { - - // Add 1 - List<MountTable> entries = createMockMountTable(nameservices); - MountTable entry0 = entries.get(0); - String srcPath = entry0.getSourcePath(); - String nsId = entry0.getDefaultLocation().getNameserviceId(); - AddMountTableEntryRequest request = - AddMountTableEntryRequest.newInstance(entry0); - AddMountTableEntryResponse response = - mountStore.addMountTableEntry(request); - assertTrue(response.getStatus()); - - // Verify - mountStore.loadCache(true); - MountTable matchingEntry0 = getMountTableEntry(srcPath); - assertNotNull(matchingEntry0); - assertEquals(nsId, matchingEntry0.getDefaultLocation().getNameserviceId()); - - // Edit destination nameservice for source path - Map<String, String> destMap = - Collections.singletonMap("testnameservice", "/"); - MountTable replacement = - MountTable.newInstance(srcPath, destMap); - UpdateMountTableEntryRequest updateRequest = - UpdateMountTableEntryRequest.newInstance(replacement); - UpdateMountTableEntryResponse updateResponse = - mountStore.updateMountTableEntry(updateRequest); - assertTrue(updateResponse.getStatus()); - - // Verify - mountStore.loadCache(true); - MountTable matchingEntry1 = getMountTableEntry(srcPath); - assertNotNull(matchingEntry1); - assertEquals("testnameservice", - matchingEntry1.getDefaultLocation().getNameserviceId()); - } - - /** - * Gets an existing mount table record in the state store. - * - * @param mount The mount point of the record to remove. - * @return The matching record if found, null if it is not found. - * @throws IOException If the state store could not be accessed. - */ - private MountTable getMountTableEntry(String mount) throws IOException { - GetMountTableEntriesRequest request = - GetMountTableEntriesRequest.newInstance(mount); - GetMountTableEntriesResponse response = - mountStore.getMountTableEntries(request); - List<MountTable> results = response.getEntries(); - if (results.size() > 0) { - // First result is sorted to have the shortest mount string length - return results.get(0); - } - return null; - } - - /** - * Fetch all mount table records beneath a root path. - * - * @param store FederationMountTableStore instance to commit the data. - * @param mount The root search path, enter "/" to return all mount table - * records. - * - * @return A list of all mount table records found below the root mount. - * - * @throws IOException If the state store could not be accessed. - */ - private QueryResult<MountTable> getMountTableEntries(String mount) - throws IOException { - if (mount == null) { - throw new IOException("Please specify a root search path"); - } - GetMountTableEntriesRequest request = - GetMountTableEntriesRequest.newInstance(); - request.setSrcPath(mount); - GetMountTableEntriesResponse response = - mountStore.getMountTableEntries(request); - List<MountTable> records = response.getEntries(); - long timestamp = response.getTimestamp(); - return new QueryResult<MountTable>(records, timestamp); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java deleted file mode 100644 index cbc5e7d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; -import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; -import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; -import org.apache.hadoop.util.Time; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Test the basic {@link StateStoreService} {@link RouterStore} functionality. - */ -public class TestStateStoreRouterState extends TestStateStoreBase { - - private static RouterStore routerStore; - - @BeforeClass - public static void create() { - // Reduce expirations to 5 seconds - getConf().setTimeDuration( - DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, - 5, TimeUnit.SECONDS); - } - - @Before - public void setup() throws IOException, InterruptedException { - - if (routerStore == null) { - routerStore = - getStateStore().getRegisteredRecordStore(RouterStore.class); - } - - // Clear router status registrations - assertTrue(clearRecords(getStateStore(), RouterState.class)); - } - - @Test - public void testStateStoreDisconnected() throws Exception { - - // Close the data store driver - getStateStore().closeDriver(); - assertEquals(false, getStateStore().isDriverReady()); - - // Test all APIs that access the data store to ensure they throw the correct - // exception. - GetRouterRegistrationRequest getSingleRequest = - GetRouterRegistrationRequest.newInstance(); - verifyException(routerStore, "getRouterRegistration", - StateStoreUnavailableException.class, - new Class[] {GetRouterRegistrationRequest.class}, - new Object[] {getSingleRequest}); - - GetRouterRegistrationsRequest getRequest = - GetRouterRegistrationsRequest.newInstance(); - routerStore.loadCache(true); - verifyException(routerStore, "getRouterRegistrations", - StateStoreUnavailableException.class, - new Class[] {GetRouterRegistrationsRequest.class}, - new Object[] {getRequest}); - - RouterHeartbeatRequest hbRequest = RouterHeartbeatRequest.newInstance( - RouterState.newInstance("test", 0, RouterServiceState.UNINITIALIZED)); - verifyException(routerStore, "routerHeartbeat", - StateStoreUnavailableException.class, - new Class[] {RouterHeartbeatRequest.class}, - new Object[] {hbRequest}); - } - - // - // Router - // - @Test - public void testUpdateRouterStatus() - throws IllegalStateException, IOException { - - long dateStarted = Time.now(); - String address = "testaddress"; - - // Set - RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance( - RouterState.newInstance( - address, dateStarted, RouterServiceState.RUNNING)); - assertTrue(routerStore.routerHeartbeat(request).getStatus()); - - // Verify - GetRouterRegistrationRequest getRequest = - GetRouterRegistrationRequest.newInstance(address); - RouterState record = - routerStore.getRouterRegistration(getRequest).getRouter(); - assertNotNull(record); - assertEquals(RouterServiceState.RUNNING, record.getStatus()); - assertEquals(address, record.getAddress()); - assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo()); - // Build version may vary a bit - assertFalse(record.getVersion().isEmpty()); - } - - @Test - public void testRouterStateExpired() - throws IOException, InterruptedException { - - long dateStarted = Time.now(); - String address = "testaddress"; - - RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance( - RouterState.newInstance( - address, dateStarted, RouterServiceState.RUNNING)); - // Set - assertTrue(routerStore.routerHeartbeat(request).getStatus()); - - // Verify - GetRouterRegistrationRequest getRequest = - GetRouterRegistrationRequest.newInstance(address); - RouterState record = - routerStore.getRouterRegistration(getRequest).getRouter(); - assertNotNull(record); - - // Wait past expiration (set to 5 sec in config) - Thread.sleep(6000); - - // Verify expired - RouterState r = routerStore.getRouterRegistration(getRequest).getRouter(); - assertEquals(RouterServiceState.EXPIRED, r.getStatus()); - - // Heartbeat again and this shouldn't be EXPIRED anymore - assertTrue(routerStore.routerHeartbeat(request).getStatus()); - r = routerStore.getRouterRegistration(getRequest).getRouter(); - assertEquals(RouterServiceState.RUNNING, r.getStatus()); - } - - @Test - public void testGetAllRouterStates() - throws StateStoreUnavailableException, IOException { - - // Set 2 entries - RouterHeartbeatRequest heartbeatRequest1 = - RouterHeartbeatRequest.newInstance( - RouterState.newInstance( - "testaddress1", Time.now(), RouterServiceState.RUNNING)); - assertTrue(routerStore.routerHeartbeat(heartbeatRequest1).getStatus()); - - RouterHeartbeatRequest heartbeatRequest2 = - RouterHeartbeatRequest.newInstance( - RouterState.newInstance( - "testaddress2", Time.now(), RouterServiceState.RUNNING)); - assertTrue(routerStore.routerHeartbeat(heartbeatRequest2).getStatus()); - - // Verify - routerStore.loadCache(true); - GetRouterRegistrationsRequest request = - GetRouterRegistrationsRequest.newInstance(); - List<RouterState> entries = - routerStore.getRouterRegistrations(request).getRouters(); - assertEquals(2, entries.size()); - Collections.sort(entries); - assertEquals("testaddress1", entries.get(0).getAddress()); - assertEquals("testaddress2", entries.get(1).getAddress()); - assertEquals(RouterServiceState.RUNNING, entries.get(0).getStatus()); - assertEquals(RouterServiceState.RUNNING, entries.get(1).getStatus()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
