Author: cutting Date: Tue Nov 28 15:50:39 2006 New Revision: 480291 URL: http://svn.apache.org/viewvc?view=rev&rev=480291 Log: HADOOP-698. Fix HDFS client to not retry the same datanode on read failures. Contributed by Milind.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=480291&r1=480290&r2=480291 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 28 15:50:39 2006 @@ -134,6 +134,9 @@ 39. HADOOP-747. Fix record serialization to work correctly when records are embedded in Maps. (Milind Bhandarkar via cutting) +40. HADOOP-698. Fix HDFS client not to retry the same datanode on + read failures. (Milind Bhandarkar via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=480291&r1=480290&r2=480291 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Nov 28 15:50:39 2006 @@ -536,7 +536,7 @@ * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ - private synchronized void blockSeekTo(long target) throws IOException { + private synchronized DatanodeInfo blockSeekTo(long target, TreeSet deadNodes) throws IOException { if (target >= filelen) { throw new IOException("Attempted to read past end of file"); } @@ -572,10 +572,10 @@ // Connect to best DataNode for desired Block, with potential offset // int failures = 0; - TreeSet deadNodes = new TreeSet(); + DatanodeInfo chosenNode = null; while (s == null) { DNAddrPair retval = chooseDataNode(targetBlock, deadNodes); - DatanodeInfo chosenNode = retval.info; + chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; try { @@ -608,6 +608,7 @@ this.pos = target; this.blockEnd = targetBlockEnd; this.blockStream = in; + return chosenNode; } catch (IOException ex) { // Put chosen node into dead list, continue LOG.debug("Failed to connect to " + targetAddr + ":" @@ -622,6 +623,7 @@ s = null; } } + return chosenNode; } /** @@ -653,7 +655,7 @@ int result = -1; if (pos < filelen) { if (pos > blockEnd) { - blockSeekTo(pos); + blockSeekTo(pos, new TreeSet()); } result = blockStream.read(); if (result >= 0) { @@ -673,10 +675,15 @@ } if (pos < filelen) { int retries = 2; + DatanodeInfo chosenNode = null; + TreeSet deadNodes = null; while (retries > 0) { try { if (pos > blockEnd) { - blockSeekTo(pos); + if (deadNodes == null) { + deadNodes = new TreeSet(); + } + chosenNode = blockSeekTo(pos, deadNodes); } int realLen = Math.min(len, (int) (blockEnd - pos + 1)); int result = blockStream.read(buf, off, realLen); @@ -687,6 +694,8 @@ } catch (IOException e) { LOG.warn("DFS Read: " + StringUtils.stringifyException(e)); blockEnd = -1; + if (deadNodes == null) { deadNodes = new TreeSet(); } + if (chosenNode != null) { deadNodes.add(chosenNode); } if (--retries == 0) { throw e; } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=480291&r1=480290&r2=480291 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Nov 28 15:50:39 2006 @@ -35,13 +35,12 @@ private Thread dataNodeThreads[]; private NameNodeRunner nameNode; private DataNodeRunner dataNodes[]; - private int maxRetries = 10; private int MAX_RETRIES = 10; private int MAX_RETRIES_PER_PORT = 10; private int nameNodePort = 0; private int nameNodeInfoPort = 0; - + /** * An inner class that runs a name node. */ @@ -107,10 +106,12 @@ String[] dirs = conf.getStrings("dfs.data.dir"); for (int idx = 0; idx < dirs.length; idx++) { File dataDir = new File(dirs[idx]); - if (!dataDir.mkdirs()) { - if (!dataDir.isDirectory()) { - throw new RuntimeException("Mkdirs failed to create directory " + - dataDir.toString()); + synchronized (DataNodeRunner.class) { + if (!dataDir.mkdirs()) { + if (!dataDir.isDirectory()) { + throw new RuntimeException("Mkdirs failed to create directory " + + dataDir.toString()); + } } } } @@ -143,7 +144,7 @@ public MiniDFSCluster(int namenodePort, Configuration conf, boolean dataNodeFirst) throws IOException { - this(namenodePort, conf, 1, dataNodeFirst); + this(namenodePort, conf, 1, dataNodeFirst, true); } /** @@ -158,18 +159,36 @@ Configuration conf, int nDatanodes, boolean dataNodeFirst) throws IOException { + this(namenodePort, conf, nDatanodes, dataNodeFirst, true); + } + + /** + * Create the config and start up the servers. If either the rpc or info port is already + * in use, we will try new ports. + * @param namenodePort suggestion for which rpc port to use. caller should use + * getNameNodePort() to get the actual port used. + * @param nDatanodes Number of datanodes + * @param dataNodeFirst should the datanode be brought up before the namenode? + * @param formatNamenode should the namenode be formatted before starting up ? + */ + public MiniDFSCluster(int namenodePort, + Configuration conf, + int nDatanodes, + boolean dataNodeFirst, + boolean formatNamenode) throws IOException { this.conf = conf; this.nDatanodes = nDatanodes; this.nameNodePort = namenodePort; - this.nameNodeInfoPort = 50080; // We just want this port to be different from the default. + this.nameNodeInfoPort = 50080; // We just want this port to be different from the default. File base_dir = new File(System.getProperty("test.build.data"), "dfs/"); File data_dir = new File(base_dir, "data"); conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+ new File(base_dir, "name2").getPath()); conf.setInt("dfs.replication", Math.min(3, nDatanodes)); + conf.setInt("dfs.safemode.extension", 0); // this timeout seems to control the minimum time for the test, so // decrease it considerably. conf.setInt("ipc.client.timeout", 1000); @@ -183,7 +202,7 @@ "localhost:"+ Integer.toString(nameNodePort)); conf.set("dfs.info.port", nameNodeInfoPort); - NameNode.format(conf); + if (formatNamenode) { NameNode.format(conf); } nameNode = new NameNodeRunner(); nameNodeThread = new Thread(nameNode); dataNodes = new DataNodeRunner[nDatanodes]; Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java?view=auto&rev=480291 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java Tue Nov 28 15:50:39 2006 @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.dfs; + +import java.io.*; +import java.util.Random; +import junit.framework.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * A JUnit test for corrupted file handling. + * + * @author Milind Bhandarkar + */ +public class TestFileCorruption extends TestCase { + + private static final int NFILES = 20; + private static String TEST_ROOT_DIR = + new Path(System.getProperty("test.build.data","/tmp")) + .toString().replace(' ', '+'); + + /** class MyFile contains enough information to recreate the contents of + * a single file. + */ + private static class MyFile { + private static Random gen = new Random(); + private static final int MAX_LEVELS = 3; + private static final int MAX_SIZE = 8*1024; + private static String[] dirNames = { + "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" + }; + private String name = ""; + private int size; + private long seed; + + MyFile() { + int nLevels = gen.nextInt(MAX_LEVELS); + if(nLevels != 0) { + int[] levels = new int[nLevels]; + for (int idx = 0; idx < nLevels; idx++) { + levels[idx] = gen.nextInt(10); + } + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < nLevels; idx++) { + sb.append(dirNames[levels[idx]]); + sb.append("/"); + } + name = sb.toString(); + } + long fidx = -1; + while (fidx < 0) { fidx = gen.nextLong(); } + name = name + Long.toString(fidx); + size = gen.nextInt(MAX_SIZE); + seed = gen.nextLong(); + } + + String getName() { return name; } + int getSize() { return size; } + long getSeed() { return seed; } + } + + public TestFileCorruption(String testName) { + super(testName); + } + + + + protected void setUp() throws Exception { + } + + protected void tearDown() throws Exception { + } + + /** create NFILES with random names and directory hierarchies + * with random (but reproducible) data in them. + */ + private static MyFile[] createFiles(String fsname, String topdir) + throws IOException { + MyFile[] files = new MyFile[NFILES]; + + for (int idx = 0; idx < NFILES; idx++) { + files[idx] = new MyFile(); + } + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getNamed(fsname, conf); + Path root = new Path(topdir); + + for (int idx = 0; idx < NFILES; idx++) { + Path fPath = new Path(root, files[idx].getName()); + if (!fs.mkdirs(fPath.getParent())) { + throw new IOException("Mkdirs failed to create " + + fPath.getParent().toString()); + } + FSDataOutputStream out = fs.create(fPath); + byte[] toWrite = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toWrite); + out.write(toWrite); + out.close(); + toWrite = null; + } + + return files; + } + + /** check if the files have been copied correctly. */ + private static boolean checkFiles(String fsname, String topdir, MyFile[] files) + throws IOException { + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getNamed(fsname, conf); + Path root = new Path(topdir); + + for (int idx = 0; idx < NFILES; idx++) { + Path fPath = new Path(root, files[idx].getName()); + FSDataInputStream in = fs.open(fPath); + byte[] toRead = new byte[files[idx].getSize()]; + byte[] toCompare = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toCompare); + assertEquals("Cannnot read file.", toRead.length, in.read(toRead)); + in.close(); + for (int i = 0; i < toRead.length; i++) { + if (toRead[i] != toCompare[i]) { + return false; + } + } + toRead = null; + toCompare = null; + } + + return true; + } + + /** delete directory and everything underneath it.*/ + private static void deldir(String fsname, String topdir) + throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getNamed(fsname, conf); + Path root = new Path(topdir); + fs.delete(root); + } + + /** check if DFS can handle corrupted blocks properly */ + public void testFileCorruption() throws Exception { + String namenode = null; + MiniDFSCluster cluster = null; + MyFile[] files = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster(65314, conf, 3, false); + namenode = conf.get("fs.default.name", "local"); + if (!"local".equals(namenode)) { + files = createFiles(namenode, "/srcdat"); + // Now deliberately remove the blocks + File data_dir = new File(System.getProperty("test.build.data"), + "dfs/data/data5/data"); + assertTrue("data directory does not exist", data_dir.exists()); + File[] blocks = data_dir.listFiles(); + assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0)); + for (int idx = 0; idx < blocks.length; idx++) { + if (!blocks[idx].getName().startsWith("blk_")) { + continue; + } + System.out.println("Deliberately removing file "+blocks[idx].getName()); + assertTrue("Cannot remove file.", blocks[idx].delete()); + } + assertTrue("Corrupted replicas not handled properly.", + checkFiles(namenode, "/srcdat", files)); + deldir(namenode, "/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java?view=auto&rev=480291 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestRestartDFS.java Tue Nov 28 15:50:39 2006 @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.dfs; + +import java.io.IOException; +import java.util.Random; +import junit.framework.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * A JUnit test for checking if restarting DFS preserves integrity. + * + * @author Milind Bhandarkar + */ +public class TestRestartDFS extends TestCase { + + private static final int NFILES = 20; + private static String TEST_ROOT_DIR = + new Path(System.getProperty("test.build.data","/tmp")) + .toString().replace(' ', '+'); + + /** class MyFile contains enough information to recreate the contents of + * a single file. + */ + private static class MyFile { + private static Random gen = new Random(); + private static final int MAX_LEVELS = 3; + private static final int MAX_SIZE = 8*1024; + private static String[] dirNames = { + "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" + }; + private String name = ""; + private int size; + private long seed; + + MyFile() { + int nLevels = gen.nextInt(MAX_LEVELS); + if(nLevels != 0) { + int[] levels = new int[nLevels]; + for (int idx = 0; idx < nLevels; idx++) { + levels[idx] = gen.nextInt(10); + } + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < nLevels; idx++) { + sb.append(dirNames[levels[idx]]); + sb.append("/"); + } + name = sb.toString(); + } + long fidx = -1; + while (fidx < 0) { fidx = gen.nextLong(); } + name = name + Long.toString(fidx); + size = gen.nextInt(MAX_SIZE); + seed = gen.nextLong(); + } + + String getName() { return name; } + int getSize() { return size; } + long getSeed() { return seed; } + } + + public TestRestartDFS(String testName) { + super(testName); + } + + + + protected void setUp() throws Exception { + } + + protected void tearDown() throws Exception { + } + + /** create NFILES with random names and directory hierarchies + * with random (but reproducible) data in them. + */ + private static MyFile[] createFiles(String fsname, String topdir) + throws IOException { + MyFile[] files = new MyFile[NFILES]; + + for (int idx = 0; idx < NFILES; idx++) { + files[idx] = new MyFile(); + } + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getNamed(fsname, conf); + Path root = new Path(topdir); + + for (int idx = 0; idx < NFILES; idx++) { + Path fPath = new Path(root, files[idx].getName()); + if (!fs.mkdirs(fPath.getParent())) { + throw new IOException("Mkdirs failed to create " + + fPath.getParent().toString()); + } + FSDataOutputStream out = fs.create(fPath); + byte[] toWrite = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toWrite); + out.write(toWrite); + out.close(); + toWrite = null; + } + + return files; + } + + /** check if the files have been copied correctly. */ + private static boolean checkFiles(String fsname, String topdir, MyFile[] files) + throws IOException { + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getNamed(fsname, conf); + Path root = new Path(topdir); + + for (int idx = 0; idx < NFILES; idx++) { + Path fPath = new Path(root, files[idx].getName()); + FSDataInputStream in = fs.open(fPath); + byte[] toRead = new byte[files[idx].getSize()]; + byte[] toCompare = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toCompare); + assertEquals("Cannnot read file.", toRead.length, in.read(toRead)); + in.close(); + for (int i = 0; i < toRead.length; i++) { + if (toRead[i] != toCompare[i]) { + return false; + } + } + toRead = null; + toCompare = null; + } + + return true; + } + + /** delete directory and everything underneath it.*/ + private static void deldir(String fsname, String topdir) + throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getNamed(fsname, conf); + Path root = new Path(topdir); + fs.delete(root); + } + + /** check if DFS remains in proper condition after a restart */ + public void testRestartDFS() throws Exception { + String namenode = null; + MiniDFSCluster cluster = null; + MyFile[] files = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster(65314, conf, 4, false); + namenode = conf.get("fs.default.name", "local"); + if (!"local".equals(namenode)) { + files = createFiles(namenode, "/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + assertTrue("Error creating files", files != null); + try { + Configuration conf = new Configuration(); + // Here we restart the MiniDFScluster without formatting namenode + cluster = new MiniDFSCluster(65320, conf, 4, false, false); + namenode = conf.get("fs.default.name", "local"); + if (!"local".equals(namenode)) { + assertTrue("Filesystem corrupted after restart.", + checkFiles(namenode, "/srcdat", files)); + deldir(namenode, "/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } +}