Author: arp Date: Mon Aug 26 03:18:40 2013 New Revision: 1517417 URL: http://svn.apache.org/r1517417 Log: HDFS-5000. DataNode configuration should allow specifying storage type
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1517417&r1=1517416&r2=1517417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Mon Aug 26 03:18:40 2013 @@ -460,9 +460,9 @@ class BPServiceActor implements Runnable } private String formatThreadName() { - Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf()); - return "DataNode: [" + - StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " + + Collection<StorageLocation> dataDirs = + DataNode.getStorageLocations(dn.getConf()); + return "DataNode: [" + dataDirs.toString() + "] " + " heartbeating to " + nnAddr; } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1517417&r1=1517416&r2=1517417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Aug 26 03:18:40 2013 @@ -272,7 +272,7 @@ public class DataNode extends Configured private JvmPauseMonitor pauseMonitor; private SecureResources secureResources = null; - private AbstractList<File> dataDirs; + private AbstractList<StorageLocation> dataDirs; private Configuration conf; private final List<String> usersWithLocalPathAccess; @@ -281,20 +281,11 @@ public class DataNode extends Configured private final boolean getHdfsBlockLocationsEnabled; /** - * Create the DataNode given a configuration and an array of dataDirs. - * 'dataDirs' is where the blocks are stored. - */ - DataNode(final Configuration conf, - final AbstractList<File> dataDirs) throws IOException { - this(conf, dataDirs, null); - } - - /** * Create the DataNode given a configuration, an array of dataDirs, * and a namenode proxy */ - DataNode(final Configuration conf, - final AbstractList<File> dataDirs, + DataNode(final Configuration conf, + final AbstractList<StorageLocation> dataDirs, final SecureResources resources) throws IOException { super(conf); @@ -711,7 +702,7 @@ public class DataNode extends Configured * @throws IOException */ void startDataNode(Configuration conf, - AbstractList<File> dataDirs, + AbstractList<StorageLocation> dataDirs, // DatanodeProtocol namenode, SecureResources resources ) throws IOException { @@ -861,7 +852,7 @@ public class DataNode extends Configured * If this is the first block pool to register, this also initializes * the datanode-scoped storage. * - * @param nsInfo the handshake response from the NN. + * @param bpos block pool to initialize and register with the NameNode. * @throws IOException if the NN is inconsistent with the local storage. */ void initBlockPool(BPOfferService bpos) throws IOException { @@ -1688,17 +1679,39 @@ public class DataNode extends Configured printUsage(System.err); return null; } - Collection<URI> dataDirs = getStorageDirs(conf); + Collection<StorageLocation> dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_USER_NAME_KEY); - return makeInstance(dataDirs, conf, resources); + return makeInstance(dataLocations, conf, resources); + } + + static Collection<StorageLocation> parseStorageLocations( + Collection<String> rawLocations) { + List<StorageLocation> locations = + new ArrayList<StorageLocation>(rawLocations.size()); + + for(String locationString : rawLocations) { + StorageLocation location; + try { + location = StorageLocation.parse(locationString); + } catch (IOException ioe) { + LOG.error("Failed to parse storage location " + locationString); + continue; + } catch (IllegalArgumentException iae) { + LOG.error(iae.toString()); + continue; + } + + locations.add(location); + } + + return locations; } - static Collection<URI> getStorageDirs(Configuration conf) { - Collection<String> dirNames = - conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); - return Util.stringCollectionAsURIs(dirNames); + static Collection<StorageLocation> getStorageLocations(Configuration conf) { + return parseStorageLocations( + conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY)); } /** Instantiate & Start a single datanode daemon and wait for it to finish. @@ -1764,51 +1777,45 @@ public class DataNode extends Configured * no directory from this directory list can be created. * @throws IOException */ - static DataNode makeInstance(Collection<URI> dataDirs, Configuration conf, - SecureResources resources) throws IOException { + static DataNode makeInstance(Collection<StorageLocation> dataDirs, + Configuration conf, SecureResources resources) throws IOException { LocalFileSystem localFS = FileSystem.getLocal(conf); FsPermission permission = new FsPermission( conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); DataNodeDiskChecker dataNodeDiskChecker = new DataNodeDiskChecker(permission); - ArrayList<File> dirs = - getDataDirsFromURIs(dataDirs, localFS, dataNodeDiskChecker); + ArrayList<StorageLocation> locations = + checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker); DefaultMetricsSystem.initialize("DataNode"); - assert dirs.size() > 0 : "number of data directories should be > 0"; - return new DataNode(conf, dirs, resources); + assert locations.size() > 0 : "number of data directories should be > 0"; + return new DataNode(conf, locations, resources); } // DataNode ctor expects AbstractList instead of List or Collection... - static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs, + static ArrayList<StorageLocation> checkStorageLocations( + Collection<StorageLocation> dataDirs, LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker) throws IOException { - ArrayList<File> dirs = new ArrayList<File>(); + ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>(); StringBuilder invalidDirs = new StringBuilder(); - for (URI dirURI : dataDirs) { - if (!"file".equalsIgnoreCase(dirURI.getScheme())) { - LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ..."); - invalidDirs.append("\"").append(dirURI).append("\" "); - continue; - } - // drop any (illegal) authority in the URI for backwards compatibility - File dir = new File(dirURI.getPath()); + for (StorageLocation location : dataDirs) { try { - dataNodeDiskChecker.checkDir(localFS, new Path(dir.toURI())); - dirs.add(dir); + dataNodeDiskChecker.checkDir(localFS, new Path(location.getUri())); + locations.add(location); } catch (IOException ioe) { LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " " - + dir + " : ", ioe); - invalidDirs.append("\"").append(dir.getCanonicalPath()).append("\" "); + + location.getFile() + " : ", ioe); + invalidDirs.append("\"").append(location.getFile().getCanonicalPath()).append("\" "); } } - if (dirs.size() == 0) { + if (locations.size() == 0) { throw new IOException("All directories in " + DFS_DATANODE_DATA_DIR_KEY + " are invalid: " + invalidDirs); } - return dirs; + return locations; } @Override Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1517417&r1=1517416&r2=1517417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Aug 26 03:18:40 2013 @@ -129,7 +129,8 @@ public class DataStorage extends Storage * @throws IOException */ synchronized void recoverTransitionRead(DataNode datanode, - NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt) throws IOException { if (initialized) { // DN storage has been initialized, no need to do anything @@ -145,8 +146,8 @@ public class DataStorage extends Storage // Format and recover. this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size()); - for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) { - File dataDir = it.next(); + for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) { + File dataDir = it.next().getFile(); StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { @@ -215,14 +216,14 @@ public class DataStorage extends Storage * @throws IOException on error */ void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo, - Collection<File> dataDirs, StartupOption startOpt) throws IOException { + Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException { // First ensure datanode level format/snapshot/rollback is completed recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt); - + // Create list of storage directories for the block pool Collection<File> bpDataDirs = new ArrayList<File>(); - for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) { - File dnRoot = it.next(); + for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) { + File dnRoot = it.next().getFile(); File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot, STORAGE_DIR_CURRENT)); bpDataDirs.add(bpRoot); Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1517417&r1=1517416&r2=1517417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Mon Aug 26 03:18:40 2013 @@ -35,6 +35,8 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -44,18 +46,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -80,10 +73,7 @@ import org.apache.hadoop.test.GenericTes import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -121,7 +111,7 @@ public class TestBlockRecovery { * @throws IOException */ @Before - public void startUp() throws IOException { + public void startUp() throws IOException, URISyntaxException { tearDownDone = false; conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); @@ -131,11 +121,12 @@ public class TestBlockRecovery { conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); FileSystem.setDefaultUri(conf, "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); - ArrayList<File> dirs = new ArrayList<File>(); + ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>(); File dataDir = new File(DATA_DIR); FileUtil.fullyDelete(dataDir); dataDir.mkdirs(); - dirs.add(dataDir); + StorageLocation location = new StorageLocation(new URI(dataDir.getPath())); + locations.add(location); final DatanodeProtocolClientSideTranslatorPB namenode = mock(DatanodeProtocolClientSideTranslatorPB.class); @@ -161,7 +152,7 @@ public class TestBlockRecovery { new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1))); - dn = new DataNode(conf, dirs, null) { + dn = new DataNode(conf, locations, null) { @Override DatanodeProtocolClientSideTranslatorPB connectToNN( InetSocketAddress nnAddr) throws IOException { Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java?rev=1517417&r1=1517416&r2=1517417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java Mon Aug 26 03:18:40 2013 @@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.da import java.io.*; import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; import org.junit.Test; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -34,19 +37,71 @@ import org.apache.hadoop.hdfs.server.dat public class TestDataDirs { - @Test (timeout = 10000) - public void testGetDataDirsFromURIs() throws Throwable { + @Test (timeout = 30000) + public void testDataDirParsing() throws Throwable { + Configuration conf = new Configuration(); + ArrayList<StorageLocation> locations; + File dir0 = new File("/dir0"); + File dir1 = new File("/dir1"); + File dir2 = new File("/dir2"); + File dir3 = new File("/dir3"); + + // Verify that a valid string is correctly parsed, and that storage + // type is not case-sensitive + String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, locations1); + locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf)); + assertThat(locations.size(), is(4)); + assertThat(locations.get(0).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(0).getUri(), is(dir0.toURI())); + assertThat(locations.get(1).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(1).getUri(), is(dir1.toURI())); + assertThat(locations.get(2).getStorageType(), is(StorageType.SSD)); + assertThat(locations.get(2).getUri(), is(dir2.toURI())); + assertThat(locations.get(3).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(3).getUri(), is(dir3.toURI())); + + // Verify that an unrecognized storage type is ignored. + String locations2 = "[BadMediaType]/dir0,[ssd]/dir1,[disk]/dir2"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, locations2); + locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf)); + assertThat(locations.size(), is(3)); + assertThat(locations.get(0).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(0).getUri(), is(dir0.toURI())); + assertThat(locations.get(1).getStorageType(), is(StorageType.SSD)); + assertThat(locations.get(1).getUri(), is(dir1.toURI())); + assertThat(locations.get(2).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(2).getUri(), is(dir2.toURI())); + + // Assert that a string with no storage type specified is + // correctly parsed and the default storage type is picked up. + String locations3 = "/dir0,/dir1"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, locations3); + locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf)); + assertThat(locations.size(), is(2)); + assertThat(locations.get(0).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(0).getUri(), is(dir0.toURI())); + assertThat(locations.get(1).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(1).getUri(), is(dir1.toURI())); + } + + @Test (timeout = 30000) + public void testDataDirValidation() throws Throwable { DataNodeDiskChecker diskChecker = mock(DataNodeDiskChecker.class); doThrow(new IOException()).doThrow(new IOException()).doNothing() .when(diskChecker).checkDir(any(LocalFileSystem.class), any(Path.class)); LocalFileSystem fs = mock(LocalFileSystem.class); - Collection<URI> uris = Arrays.asList(new URI("file:/p1/"), - new URI("file:/p2/"), new URI("file:/p3/")); + AbstractList<StorageLocation> locations = new ArrayList<StorageLocation>(); - List<File> dirs = DataNode.getDataDirsFromURIs(uris, fs, diskChecker); - assertEquals("number of valid data dirs", 1, dirs.size()); - String validDir = dirs.iterator().next().getPath(); - assertEquals("p3 should be valid", new File("/p3").getPath(), validDir); + locations.add(new StorageLocation(new URI("file:/p1/"))); + locations.add(new StorageLocation(new URI("file:/p2/"))); + locations.add(new StorageLocation(new URI("file:/p3/"))); + + ArrayList<StorageLocation> checkedLocations = + DataNode.checkStorageLocations(locations, fs, diskChecker); + assertEquals("number of valid data dirs", 1, checkedLocations.size()); + String validDir = checkedLocations.iterator().next().getFile().getPath(); + assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir)); } }