Author: cutting Date: Fri Jan 26 13:49:38 2007 New Revision: 500370 URL: http://svn.apache.org/viewvc?view=rev&rev=500370 Log: HADOOP-731. When a checksum error is encountered on a file stored in HDFS, try to find another replica. Contributed by Wendy.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 13:49:38 2007 @@ -63,6 +63,10 @@ 19. HADOOP-909. Fix the 'du' command to correctly compute the size of FileSystem directory trees. (Hairong Kuang via cutting) +20. HADOOP-731. When a checksum error is encountered on a file stored + in HDFS, try another replica of the data, if any. + (Wendy Chien via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Jan 26 13:49:38 2007 @@ -618,7 +618,7 @@ DNAddrPair retval = chooseDataNode(targetBlock, deadNodes); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; - + try { s = new Socket(); s.connect(targetAddr, READ_TIMEOUT); @@ -764,7 +764,7 @@ if (nodes[blockId] == null || nodes[blockId].length == 0) { LOG.info("No node available for block: " + blockInfo); } - LOG.info("Could not obtain block from any node: " + ie); + LOG.info("Could not obtain block " + blockId + " from any node: " + ie); try { Thread.sleep(3000); } catch (InterruptedException iex) { @@ -889,6 +889,24 @@ blockEnd = -1; } + /** + * Seek to given position on a node other than the current node. If + * a node other than the current node is found, then returns true. + * If another node could not be found, then returns false. + */ + public synchronized boolean seekToNewSource(long targetPos) throws IOException { + TreeSet excludeNodes = new TreeSet(); + excludeNodes.add(currentNode); + String oldNodeID = currentNode.getStorageID(); + DatanodeInfo newNode = blockSeekTo(targetPos, excludeNodes); + if (!oldNodeID.equals(newNode.getStorageID())) { + currentNode = newNode; + return true; + } else { + return false; + } + } + /** */ public synchronized long getPos() throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Fri Jan 26 13:49:38 2007 @@ -90,34 +90,61 @@ } public int read(byte b[], int off, int len) throws IOException { - int read = in.read(b, off, len); + int read; + boolean retry; + int retriesLeft = 3; + long oldPos = getPos(); + do { + retriesLeft--; + retry = false; - if (sums != null) { - int summed = 0; - while (summed < read) { - - int goal = bytesPerSum - inSum; - int inBuf = read - summed; - int toSum = inBuf <= goal ? inBuf : goal; - + read = in.read(b, off, len); + + if (sums != null) { + long oldSumsPos = sums.getPos(); try { - sum.update(b, off+summed, toSum); - } catch (ArrayIndexOutOfBoundsException e) { - throw new RuntimeException("Summer buffer overflow b.len=" + - b.length + ", off=" + off + - ", summed=" + summed + ", read=" + - read + ", bytesPerSum=" + bytesPerSum + - ", inSum=" + inSum, e); - } - summed += toSum; + int summed = 0; + while (summed < read) { + int goal = bytesPerSum - inSum; + int inBuf = read - summed; + int toSum = inBuf <= goal ? inBuf : goal; - inSum += toSum; - if (inSum == bytesPerSum) { - verifySum(read-(summed-bytesPerSum)); + try { + sum.update(b, off+summed, toSum); + } catch (ArrayIndexOutOfBoundsException e) { + throw new RuntimeException("Summer buffer overflow b.len=" + + b.length + ", off=" + off + + ", summed=" + summed + ", read=" + + read + ", bytesPerSum=" + bytesPerSum + + ", inSum=" + inSum, e); + } + summed += toSum; + + inSum += toSum; + if (inSum == bytesPerSum) { + verifySum(read-(summed-bytesPerSum)); + } + } + } catch (ChecksumException ce) { + LOG.info("Found checksum error: " + StringUtils.stringifyException(ce)); + if (retriesLeft == 0) { + throw ce; + } + sums.seek(oldSumsPos); + if (!((FSInputStream)in).seekToNewSource(oldPos) || + !((FSInputStream)sumsIn).seekToNewSource(oldSumsPos)) { + // Neither the data stream nor the checksum stream are being read from + // different sources, meaning we'll still get a checksum error if we + // try to do the read again. We throw an exception instead. + throw ce; + } else { + // Since at least one of the sources is different, the read might succeed, + // so we'll retry. + retry = true; + } } } - } - + } while (retry); return read; } @@ -270,7 +297,11 @@ public FSDataInputStream(FileSystem fs, Path file, int bufferSize, Configuration conf) throws IOException { super(null); - this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize); + Checker chkr = new Checker(fs, file, conf); // sets bytesPerSum + if (bufferSize % bytesPerSum != 0) { + throw new IOException("Buffer size must be multiple of " + bytesPerSum); + } + this.in = new Buffer(new PositionCache(chkr), bufferSize); } @@ -278,7 +309,11 @@ throws IOException { super(null); int bufferSize = conf.getInt("io.file.buffer.size", 4096); - this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize); + Checker chkr = new Checker(fs, file, conf); + if (bufferSize % bytesPerSum != 0) { + throw new IOException("Buffer size must be multiple of " + bytesPerSum); + } + this.in = new Buffer(new PositionCache(chkr), bufferSize); } /** Construct without checksums. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java Fri Jan 26 13:49:38 2007 @@ -38,7 +38,13 @@ * Return the current offset from the start of the file */ public abstract long getPos() throws IOException; - + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + */ + public abstract boolean seekToNewSource(long targetPos) throws IOException; + public int read(long position, byte[] buffer, int offset, int length) throws IOException { synchronized (this) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Fri Jan 26 13:49:38 2007 @@ -114,6 +114,10 @@ din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos); } + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + public int available() throws IOException { return din.available(); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Fri Jan 26 13:49:38 2007 @@ -95,6 +95,10 @@ return fis.getChannel().position(); } + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + /* * Just forward to the fis */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=diff&rev=500370&r1=500369&r2=500370 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Fri Jan 26 13:49:38 2007 @@ -61,6 +61,11 @@ } @Override + public synchronized boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override public synchronized int read() throws IOException { if (closed) { throw new IOException("Stream closed");