Author: stack Date: Tue Feb 2 06:25:36 2010 New Revision: 905527 URL: http://svn.apache.org/viewvc?rev=905527&view=rev Log: HDFS-927 DFSInputStream retries too many times for new block location
Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=905527&r1=905526&r2=905527&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Tue Feb 2 06:25:36 2010 @@ -113,6 +113,9 @@ HDFS-922. Remove unnecessary semicolon added by HDFS-877 that causes problems for Eclipse compilation. (jghoman) + HDFS-927 DFSInputStream retries too many times for new block locations + (Todd Lipcon via Stack) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=905527&r1=905526&r2=905527&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Feb 2 06:25:36 2010 @@ -259,9 +259,7 @@ // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - this.maxBlockAcquireFailures = - conf.getInt("dfs.client.max.block.acquire.failures", - MAX_BLOCK_ACQUIRE_FAILURES); + this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); // The hdfsTimeout is currently the same as the ipc timeout this.hdfsTimeout = Client.getTimeout(conf); @@ -332,6 +330,11 @@ } } + static int getMaxBlockAcquireFailures(Configuration conf) { + return conf.getInt("dfs.client.max.block.acquire.failures", + MAX_BLOCK_ACQUIRE_FAILURES); + } + /** * Get server default values for a number of configuration params. */ @@ -1751,7 +1754,7 @@ * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ - private class DFSInputStream extends FSInputStream { + class DFSInputStream extends FSInputStream { private Socket s = null; private boolean closed = false; @@ -1765,6 +1768,18 @@ private Block currentBlock = null; private long pos = 0; private long blockEnd = -1; + + /** + * This variable tracks the number of failures since the start of the + * most recent user-facing operation. That is to say, it should be reset + * whenever the user makes a call on this stream, and if at any point + * during the retry logic, the failure count exceeds a threshold, + * the errors will be thrown back to the operation. + * + * Specifically this counts the number of times the client has gone + * back to the namenode to get a new list of block locations, and is + * capped at maxBlockAcquireFailures + */ private int failures = 0; private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught @@ -2027,7 +2042,6 @@ // DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once - failures = 0; while (true) { // @@ -2178,6 +2192,7 @@ if (closed) { throw new IOException("Stream closed"); } + failures = 0; if (pos < getFileLength()) { int retries = 2; while (retries > 0) { @@ -2270,7 +2285,6 @@ // Socket dn = null; int refetchToken = 1; // only need to get a new access token once - failures = 0; while (true) { // cached block locations may have been updated by chooseDataNode() @@ -2347,6 +2361,7 @@ if (closed) { throw new IOException("Stream closed"); } + failures = 0; long filelen = getFileLength(); if ((position < 0) || (position >= filelen)) { return -1; Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=905527&r1=905526&r2=905527&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Tue Feb 2 06:25:36 2010 @@ -20,14 +20,18 @@ import java.io.File; import java.io.RandomAccessFile; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Random; -import junit.framework.TestCase; +import org.junit.Test; +import static org.junit.Assert.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; /** * A JUnit test for corrupted file handling. @@ -56,18 +60,7 @@ * increase replication factor of file to 3. verify that the new * replica was created from the non-corrupted replica. */ -public class TestCrcCorruption extends TestCase { - - public TestCrcCorruption(String testName) { - super(testName); - } - - protected void setUp() throws Exception { - } - - protected void tearDown() throws Exception { - } - +public class TestCrcCorruption { /** * check if DFS can handle corrupted CRC blocks */ @@ -202,6 +195,7 @@ } } + @Test public void testCrcCorruption() throws Exception { // // default parameters @@ -222,4 +216,59 @@ DFSTestUtil util2 = new DFSTestUtil("TestCrcCorruption", 40, 3, 400); thistest(conf2, util2); } + + + /** + * Make a single-DN cluster, corrupt a block, and make sure + * there's no infinite loop, but rather it eventually + * reports the exception to the client. + */ + @Test(timeout=300000) // 5 min timeout + public void testEntirelyCorruptFileOneNode() throws Exception { + doTestEntirelyCorruptFile(1); + } + + /** + * Same thing with multiple datanodes - in history, this has + * behaved differently than the above. + * + * This test usually completes in around 15 seconds - if it + * times out, this suggests that the client is retrying + * indefinitely. + */ + @Test(timeout=300000) // 5 min timeout + public void testEntirelyCorruptFileThreeNodes() throws Exception { + doTestEntirelyCorruptFile(3); + } + + private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception { + long fileSize = 4096; + Path file = new Path("/testFile"); + + Configuration conf = new Configuration(); + conf.setInt("dfs.replication", numDataNodes); + MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/); + DFSTestUtil.waitReplication(fs, file, (short)numDataNodes); + + String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName(); + cluster.corruptBlockOnDataNodes(block); + + try { + IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf, + true); + fail("Didn't get exception"); + } catch (IOException ioe) { + DFSClient.LOG.info("Got expected exception", ioe); + } + + } finally { + cluster.shutdown(); + } + } } Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=905527&r1=905526&r2=905527&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Feb 2 06:25:36 2010 @@ -20,7 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.security.MessageDigest; import org.apache.commons.logging.Log; @@ -29,11 +31,13 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient.DFSInputStream; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction; import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.*; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.AccessControlException; @@ -43,6 +47,9 @@ import junit.framework.TestCase; +import static org.mockito.Mockito.*; +import org.mockito.stubbing.Answer; +import org.mockito.invocation.InvocationOnMock; /** * These tests make sure that DFSClient retries fetching data from DFS @@ -279,6 +286,129 @@ e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION)); } } + + /** + * This tests that DFSInputStream failures are counted for a given read + * operation, and not over the lifetime of the stream. It is a regression + * test for HDFS-127. + */ + public void testFailuresArePerOperation() throws Exception + { + long fileSize = 4096; + Path file = new Path("/testFile"); + + Configuration conf = new Configuration(); + // Set short retry timeout so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + + int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf); + assertTrue(maxBlockAcquires > 0); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + NameNode preSpyNN = cluster.getNameNode(); + NameNode spyNN = spy(preSpyNN); + DFSClient client = new DFSClient(null, spyNN, conf, null); + + DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/); + + // If the client will retry maxBlockAcquires times, then if we fail + // any more than that number of times, the operation should entirely + // fail. + doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1)) + .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); + try { + IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf, + true); + fail("Didn't get exception"); + } catch (IOException ioe) { + DFSClient.LOG.info("Got expected exception", ioe); + } + + // If we fail exactly that many times, then it should succeed. + doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) + .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); + IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf, + true); + + DFSClient.LOG.info("Starting test case for failure reset"); + + // Now the tricky case - if we fail a few times on one read, then succeed, + // then fail some more on another read, it shouldn't fail. + doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) + .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); + DFSInputStream is = client.open(file.toString()); + byte buf[] = new byte[10]; + IOUtils.readFully(is, buf, 0, buf.length); + + DFSClient.LOG.info("First read successful after some failures."); + + // Further reads at this point will succeed since it has the good block locations. + // So, force the block locations on this stream to be refreshed from bad info. + // When reading again, it should start from a fresh failure count, since + // we're starting a new operation on the user level. + doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) + .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); + is.openInfo(); + // Seek to beginning forces a reopen of the BlockReader - otherwise it'll + // just keep reading on the existing stream and the fact that we've poisoned + // the block info won't do anything. + is.seek(0); + IOUtils.readFully(is, buf, 0, buf.length); + + } finally { + cluster.shutdown(); + } + } + + /** + * Mock Answer implementation of NN.getBlockLocations that will return + * a poisoned block list a certain number of times before returning + * a proper one. + */ + private static class FailNTimesAnswer implements Answer<LocatedBlocks> { + private int failuresLeft; + private NameNode realNN; + + public FailNTimesAnswer(NameNode realNN, int timesToFail) { + failuresLeft = timesToFail; + this.realNN = realNN; + } + + public LocatedBlocks answer(InvocationOnMock invocation) throws IOException { + Object args[] = invocation.getArguments(); + LocatedBlocks realAnswer = realNN.getBlockLocations( + (String)args[0], + (Long)args[1], + (Long)args[2]); + + if (failuresLeft-- > 0) { + NameNode.LOG.info("FailNTimesAnswer injecting failure."); + return makeBadBlockList(realAnswer); + } + NameNode.LOG.info("FailNTimesAnswer no longer failing."); + return realAnswer; + } + + private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) { + LocatedBlock goodLocatedBlock = goodBlockList.get(0); + LocatedBlock badLocatedBlock = new LocatedBlock( + goodLocatedBlock.getBlock(), + new DatanodeInfo[] { + new DatanodeInfo(new DatanodeID("255.255.255.255:234")) + }, + goodLocatedBlock.getStartOffset(), + false); + + + List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>(); + badBlocks.add(badLocatedBlock); + return new LocatedBlocks(goodBlockList.getFileLength(), false, + badBlocks, null, true); + } + } /**