Author: cutting Date: Tue Jan 16 15:57:34 2007 New Revision: 496897 URL: http://svn.apache.org/viewvc?view=rev&rev=496897 Log: HADOOP-855. In HDFS, try to repair files with checksum errors.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 15:57:34 2007 @@ -28,6 +28,10 @@ 7. HADOOP-801. Add to jobtracker a log of task completion events. (Sanjay Dahiya via cutting) + 8. HADOOP-855. In HDFS, try to repair files with checksum errors. + An exception is still thrown, but corrupt blocks are now removed + when they have replicas. (Wendy Chien via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Jan 16 15:57:34 2007 @@ -29,7 +29,7 @@ **********************************************************************/ interface ClientProtocol extends VersionedProtocol { - public static final long versionID = 5L; // open() takes a new parameter + public static final long versionID = 6L; // reportBadBlocks added /////////////////////////////////////// // File contents @@ -142,6 +142,13 @@ * times before succeeding. */ public boolean complete(String src, String clientName) throws IOException; + + /** + * The client wants to report corrupted blocks (blocks with specified + * locations on datanodes). + * @param blocks Array of located blocks to report + */ + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException; /////////////////////////////////////// // Namespace management Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Jan 16 15:57:34 2007 @@ -179,7 +179,14 @@ } } } - + + /** + * Report corrupt blocks that were discovered by the client. + */ + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + namenode.reportBadBlocks(blocks); + } + public short getDefaultReplication() { return defaultReplication; } @@ -496,10 +503,12 @@ private DataInputStream blockStream; private Block blocks[] = null; private DatanodeInfo nodes[][] = null; + private DatanodeInfo currentNode = null; + private Block currentBlock = null; private long pos = 0; private long filelen = 0; private long blockEnd = -1; - + /** */ public DFSInputStream(String src) throws IOException { @@ -538,8 +547,24 @@ } this.blocks = newBlocks; this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]); + this.currentNode = null; + } + + /** + * Returns the datanode from which the stream is currently reading. + */ + public DatanodeInfo getCurrentDatanode() { + return currentNode; + } + + /** + * Returns the block containing the target position. + */ + public Block getCurrentBlock() { + return currentBlock; } + /** * Used by the automatic tests to detemine blocks locations of a * file @@ -623,6 +648,7 @@ this.pos = target; this.blockEnd = targetBlockEnd; + this.currentBlock = blocks[targetBlock]; this.blockStream = in; return chosenNode; } catch (IOException ex) { @@ -671,7 +697,7 @@ int result = -1; if (pos < filelen) { if (pos > blockEnd) { - blockSeekTo(pos, new TreeSet()); + currentNode = blockSeekTo(pos, new TreeSet()); } result = blockStream.read(); if (result >= 0) { @@ -691,7 +717,6 @@ } if (pos < filelen) { int retries = 2; - DatanodeInfo chosenNode = null; TreeSet deadNodes = null; while (retries > 0) { try { @@ -699,7 +724,7 @@ if (deadNodes == null) { deadNodes = new TreeSet(); } - chosenNode = blockSeekTo(pos, deadNodes); + currentNode = blockSeekTo(pos, deadNodes); } int realLen = Math.min(len, (int) (blockEnd - pos + 1)); int result = blockStream.read(buf, off, realLen); @@ -711,7 +736,7 @@ LOG.warn("DFS Read: " + StringUtils.stringifyException(e)); blockEnd = -1; if (deadNodes == null) { deadNodes = new TreeSet(); } - if (chosenNode != null) { deadNodes.add(chosenNode); } + if (currentNode != null) { deadNodes.add(currentNode); } if (--retries == 0) { throw e; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Jan 16 15:57:34 2007 @@ -24,7 +24,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.*; /**************************************************************** * Implementation of the abstract FileSystem for the DFS system. @@ -248,15 +248,51 @@ return dfs; } - public void reportChecksumFailure(Path f, FSInputStream in, - long start, long length, int crc) { + + /** + * We need to find the blocks that didn't match. Likely only one + * is corrupt but we will report both to the namenode. In the future, + * we can consider figuring out exactly which block is corrupt. + */ + public void reportChecksumFailure(Path f, + FSInputStream in, long inPos, + FSInputStream sums, long sumsPos) { - // ignore for now, causing task to fail, and hope that when task is - // retried it gets a different copy of the block that is not corrupt. + LocatedBlock lblocks[] = new LocatedBlock[2]; + + try { + // Find block in data stream. + DFSClient.DFSInputStream dfsIn = (DFSClient.DFSInputStream) in; + Block dataBlock = dfsIn.getCurrentBlock(); + if (dataBlock == null) { + throw new IOException("Error: Current block in data stream is null! "); + } + DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; + lblocks[0] = new LocatedBlock(dataBlock, dataNode); + LOG.info("Found checksum error in data stream at block=" + dataBlock.getBlockName() + + " on datanode=" + dataNode[0].getName()); + + // Find block in checksum stream + DFSClient.DFSInputStream dfsSums = (DFSClient.DFSInputStream) sums; + Block sumsBlock = dfsSums.getCurrentBlock(); + if (sumsBlock == null) { + throw new IOException("Error: Current block in checksum stream is null! "); + } + DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; + lblocks[1] = new LocatedBlock(sumsBlock, sumsNode); + LOG.info("Found checksum error in checksum stream at block=" + sumsBlock.getBlockName() + + " on datanode=" + sumsNode[0].getName()); + + // Ask client to delete blocks. + dfs.reportBadBlocks(lblocks); + + } catch (IOException ie) { + LOG.info("Found corruption while reading " + + f.toString() + + ". Error repairing corrupt blocks. Bad blocks remain. " + + StringUtils.stringifyException(ie)); + } - // FIXME: we should move the bad block(s) involved to a bad block - // directory on their datanode, and then re-replicate the blocks, so that - // no data is lost. a task may fail, but on retry it should succeed. } /** Return the total raw capacity of the filesystem, disregarding Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jan 16 15:57:34 2007 @@ -766,6 +766,52 @@ return true; } + /** + * Adds block to list of blocks which will be invalidated on + * specified datanode. + */ + private void addToInvalidates(Block b, DatanodeInfo n) { + Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID()); + if (invalidateSet == null) { + invalidateSet = new ArrayList<Block>(); + recentInvalidateSets.put(n.getStorageID(), invalidateSet); + } + invalidateSet.add(b); + } + + /** + * Invalidates the given block on the given datanode. + */ + public synchronized void invalidateBlock(Block blk, DatanodeInfo dn) + throws IOException { + NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " + + blk.getBlockName() + " on " + + dn.getName()); + if (isInSafeMode()) { + throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode); + } + + List<DatanodeDescriptor> containingNodes = blocksMap.get(blk); + + // Check how many copies we have of the block. If we have at least one + // copy on a live node, then we can delete it. + if (containingNodes != null ) { + if ((countContainingNodes(containingNodes) > 1) || + ((countContainingNodes(containingNodes) == 1) && + (dn.isDecommissionInProgress() || dn.isDecommissioned()))) { + addToInvalidates(blk, dn); + removeStoredBlock(blk, getDatanode(dn)); + NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: " + + blk.getBlockName() + " on " + + dn.getName() + " listed for deletion."); + } else { + NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: " + + blk.getBlockName() + " on " + + dn.getName() + " is the only copy and was not deleted."); + } + } + } + //////////////////////////////////////////////////////////////// // Here's how to handle block-copy failure during client write: // -- As usual, the client's write should result in a streaming @@ -807,12 +853,7 @@ if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor node = it.next(); - Collection<Block> invalidateSet = recentInvalidateSets.get(node.getStorageID()); - if (invalidateSet == null) { - invalidateSet = new ArrayList<Block>(); - recentInvalidateSets.put(node.getStorageID(), invalidateSet); - } - invalidateSet.add(b); + addToInvalidates(b, node); NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: " + b.getBlockName() + " is added to invalidSet of " + node.getName() ); } @@ -1732,7 +1773,7 @@ // be-replicated list. // FSDirectory.INode fileINode = dir.getFileByBlock(block); - if( fileINode != null && (containingNodes.size() < fileINode.getReplication())) { + if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) { synchronized (neededReplications) { neededReplications.add(block); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Jan 16 15:57:34 2007 @@ -314,6 +314,25 @@ throw new IOException("Could not complete write to file " + src + " by " + clientName); } } + + /** + * The client has detected an error on the specified located blocks + * and is reporting them to the server. For now, the namenode will + * delete the blocks from the datanodes. In the future we might + * check the blocks are actually corrupt. + */ + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + stateChangeLog.debug("*DIR* NameNode.reportBadBlocks"); + for (int i = 0; i < blocks.length; i++) { + Block blk = blocks[i].getBlock(); + DatanodeInfo[] nodes = blocks[i].getLocations(); + for (int j = 0; j < nodes.length; j++) { + DatanodeInfo dn = nodes[j]; + namesystem.invalidateBlock(blk, dn); + } + } + } + /** */ public String[][] getHints(String src, long start, long len) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Tue Jan 16 15:57:34 2007 @@ -46,6 +46,7 @@ private FSDataInputStream sums; private Checksum sum = new CRC32(); private int inSum; + private FSInputStream sumsIn; public Checker(FileSystem fs, Path file, Configuration conf) throws IOException { @@ -55,7 +56,8 @@ this.file = file; Path sumFile = FileSystem.getChecksumFile(file); try { - this.sums = new FSDataInputStream(fs.openRaw(sumFile), conf); + sumsIn = fs.openRaw(sumFile); + this.sums = new FSDataInputStream(sumsIn, conf); byte[] version = new byte[VERSION.length]; sums.readFully(version); if (!Arrays.equals(version, VERSION)) @@ -134,7 +136,7 @@ if (crc != sumValue) { long pos = getPos() - delta; fs.reportChecksumFailure(file, (FSInputStream)in, - pos, bytesPerSum, crc); + pos, sumsIn, pos/bytesPerSum) ; throw new ChecksumException("Checksum error: "+file+" at "+pos); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Jan 16 15:57:34 2007 @@ -879,13 +879,13 @@ * Report a checksum error to the file system. * @param f the file name containing the error * @param in the stream open on the file - * @param start the position of the beginning of the bad data in the file - * @param length the length of the bad data in the file - * @param crc the expected CRC32 of the data + * @param inPos the position of the beginning of the bad data in the file + * @param sums the stream open on the checksum file + * @param sumsPos the position of the beginning of the bad data in the checksum file */ - public abstract void reportChecksumFailure(Path f, FSInputStream in, - long start, long length, - int crc); + public abstract void reportChecksumFailure(Path f, + FSInputStream in, long inPos, + FSInputStream sums, long sumsPos); /** * Get the size for a particular file. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Tue Jan 16 15:57:34 2007 @@ -367,7 +367,8 @@ /** Moves files to a bad file directory on the same device, so that their * storage will not be reused. */ public void reportChecksumFailure(Path p, FSInputStream in, - long start, long length, int crc) { + long inPos, + FSInputStream sums, long sumsPos) { try { // canonicalize f File f = pathToFile(p).getCanonicalFile(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Tue Jan 16 15:57:34 2007 @@ -289,8 +289,9 @@ } @Override - public void reportChecksumFailure(Path path, FSInputStream in, - long start, long length, int crc) { + public void reportChecksumFailure(Path f, + FSInputStream in, long inPos, + FSInputStream sums, long sumsPos) { // TODO: What to do here? } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Jan 16 15:57:34 2007 @@ -388,9 +388,11 @@ } @Override - public void reportChecksumFailure( - Path f, FSInputStream in, long start, long length, int crc) { - baseFS.reportChecksumFailure(f, in, start, length, crc); + + public void reportChecksumFailure(Path f, + FSInputStream in, long inPos, + FSInputStream sums, long sumsPos) { + baseFS.reportChecksumFailure(f, in, inPos, sums, sumsPos); } @Override