Author: szetszwo
Date: Mon Aug 17 21:45:02 2009
New Revision: 805158
URL: http://svn.apache.org/viewvc?rev=805158&view=rev
Log:
HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
Contributed by Bill Zeller
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=805158&r1=805157&r2=805158&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Mon Aug 17 21:45:02 2009
@@ -209,6 +209,9 @@
MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
the JobTracker lock hierarchy wasn't maintained in some JobInProgress
method calls. (Amar Kamat via ddas)
+
+ HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+ (Bill Zeller via szetszwo)
Release 0.20.0 - 2009-04-15
Modified:
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=805158&r1=805157&r2=805158&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Mon Aug 17 21:45:02 2009
@@ -67,8 +67,8 @@
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
- public final ClientProtocol namenode;
- final private ClientProtocol rpcNamenode;
+ public ClientProtocol namenode;
+ private ClientProtocol rpcNamenode;
final UnixUserGroupInformation ugi;
volatile boolean clientRunning = true;
Random r = new Random();
@@ -155,6 +155,31 @@
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
+ this(conf, stats);
+ this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+ this.namenode = createNamenode(this.rpcNamenode);
+ }
+
+ /**
+ * Create a new DFSClient connected to the given namenode
+ * and rpcNamenode objects.
+ *
+ * This constructor was written to allow easy testing of the DFSClient class.
+ * End users will most likely want to use one of the other constructors.
+ */
+ public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+ Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
+ this(conf, stats);
+ this.namenode = namenode;
+ this.rpcNamenode = rpcNamenode;
+ }
+
+
+ private DFSClient(Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
+
+
this.conf = conf;
this.stats = stats;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
@@ -174,9 +199,6 @@
throw (IOException)(new IOException().initCause(e));
}
- this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
- this.namenode = createNamenode(rpcNamenode);
-
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
this.clientName = "DFSClient_" + taskId;
@@ -2870,7 +2892,7 @@
private LocatedBlock locateFollowingBlock(long start
) throws IOException {
- int retries = 5;
+ int retries =
conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
@@ -2887,25 +2909,30 @@
throw ue; // no need to retry these exceptions
}
- if (--retries == 0 &&
- !NotReplicatedYetException.class.getName().
+ if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
- throw e;
+
+ if (retries == 0) {
+ throw e;
+ } else {
+ --retries;
+ LOG.info(StringUtils.stringifyException(e));
+ if (System.currentTimeMillis() - localstart > 5000) {
+ LOG.info("Waiting for replication for "
+ + (System.currentTimeMillis() - localstart) / 1000
+ + " seconds");
+ }
+ try {
+ LOG.warn("NotReplicatedYetException sleeping " + src
+ + " retries left " + retries);
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ } catch (InterruptedException ie) {
+ }
+ }
} else {
- LOG.info(StringUtils.stringifyException(e));
- if (System.currentTimeMillis() - localstart > 5000) {
- LOG.info("Waiting for replication for " +
- (System.currentTimeMillis() - localstart)/1000 +
- " seconds");
- }
- try {
- LOG.warn("NotReplicatedYetException sleeping " + src +
- " retries left " + retries);
- Thread.sleep(sleeptime);
- sleeptime *= 2;
- } catch (InterruptedException ie) {
- }
- }
+ throw e;
+ }
}
}
}
Modified:
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=805158&r1=805157&r2=805158&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
(original)
+++
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Mon Aug 17 21:45:02 2009
@@ -21,10 +21,18 @@
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
import junit.framework.TestCase;
@@ -34,6 +42,8 @@
* properly in case of errors.
*/
public class TestDFSClientRetries extends TestCase {
+ public static final Log LOG =
+ LogFactory.getLog(TestDFSClientRetries.class.getName());
// writes 'len' bytes of data to out.
private static void writeData(OutputStream out, int len) throws IOException {
@@ -97,4 +107,132 @@
}
// more tests related to different failure cases can be added here.
+
+ class TestNameNode implements ClientProtocol
+ {
+ int num_calls = 0;
+
+ // The total number of calls that can be made to addBlock
+ // before an exception is thrown
+ int num_calls_allowed;
+ public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
+ + "TestDFSClientRetries::"
+ + "TestNameNode::addBlock";
+ public final String RETRY_CONFIG
+ = "dfs.client.block.write.locateFollowingBlock.retries";
+
+ public TestNameNode(Configuration conf) throws IOException
+ {
+ // +1 because the configuration value is the number of retries and
+ // the first call is not a retry (e.g., 2 retries == 3 total
+ // calls allowed)
+ this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
+ }
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion)
+ throws IOException
+ {
+ return versionID;
+ }
+
+ public LocatedBlock addBlock(String src, String clientName)
+ throws IOException
+ {
+ num_calls++;
+ if (num_calls > num_calls_allowed) {
+ throw new IOException("addBlock called more times than "
+ + RETRY_CONFIG
+ + " allows.");
+ } else {
+ throw new RemoteException(NotReplicatedYetException.class.getName(),
+ ADD_BLOCK_EXCEPTION);
+ }
+ }
+
+
+ // The following methods are stub methods that are not needed by this mock
class
+
+ public LocatedBlocks getBlockLocations(String src, long offset, long
length) throws IOException { return null; }
+
+ public void create(String src, FsPermission masked, String clientName,
boolean overwrite, short replication, long blockSize) throws IOException {}
+
+ public LocatedBlock append(String src, String clientName) throws
IOException { return null; }
+
+ public boolean setReplication(String src, short replication) throws
IOException { return false; }
+
+ public void setPermission(String src, FsPermission permission) throws
IOException {}
+
+ public void setOwner(String src, String username, String groupname) throws
IOException {}
+
+ public void abandonBlock(Block b, String src, String holder) throws
IOException {}
+
+ public boolean complete(String src, String clientName) throws IOException
{ return false; }
+
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
+
+ public boolean rename(String src, String dst) throws IOException { return
false; }
+
+ public boolean delete(String src) throws IOException { return false; }
+
+ public boolean delete(String src, boolean recursive) throws IOException {
return false; }
+
+ public boolean mkdirs(String src, FsPermission masked) throws IOException
{ return false; }
+
+ public FileStatus[] getListing(String src) throws IOException { return
null; }
+
+ public void renewLease(String clientName) throws IOException {}
+
+ public long[] getStats() throws IOException { return null; }
+
+ public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType
type) throws IOException { return null; }
+
+ public long getPreferredBlockSize(String filename) throws IOException {
return 0; }
+
+ public boolean setSafeMode(FSConstants.SafeModeAction action) throws
IOException { return false; }
+
+ public void saveNamespace() throws IOException {}
+
+ public boolean restoreFailedStorage(String arg) throws
AccessControlException { return false; }
+
+ public void refreshNodes() throws IOException {}
+
+ public void finalizeUpgrade() throws IOException {}
+
+ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction
action) throws IOException { return null; }
+
+ public void metaSave(String filename) throws IOException {}
+
+ public FileStatus getFileInfo(String src) throws IOException { return
null; }
+
+ public ContentSummary getContentSummary(String path) throws IOException {
return null; }
+
+ public void setQuota(String path, long namespaceQuota, long
diskspaceQuota) throws IOException {}
+
+ public void fsync(String src, String client) throws IOException {}
+
+ public void setTimes(String src, long mtime, long atime) throws
IOException {}
+
+ }
+
+ public void testNotYetReplicatedErrors() throws IOException
+ {
+ Configuration conf = new Configuration();
+
+ // allow 1 retry (2 total calls)
+ conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
+
+ TestNameNode tnn = new TestNameNode(conf);
+ DFSClient client = new DFSClient(tnn, tnn, conf, null);
+ OutputStream os = client.create("testfile", true);
+ os.write(20); // write one random byte
+
+ try {
+ os.close();
+ } catch (Exception e) {
+ assertTrue("Retries are not being stopped correctly",
+ e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
+ }
+ }
+
}