Author: rangadi
Date: Tue Jul 8 12:42:30 2008
New Revision: 674926
URL: http://svn.apache.org/viewvc?rev=674926&view=rev
Log:
HADOOP-3673. Avoid deadlock caused by DataNode RPC receoverBlock().
(Tsz Wo (Nicholas), SZE via rangadi)
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeID.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSImage.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestFileCreation.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Jul 8 12:42:30 2008
@@ -98,6 +98,9 @@
a RawComparator for NullWritable and permit it to be written as a key
to SequenceFiles. (cdouglas)
+ HADOOP-3673. Avoid deadlock caused by DataNode RPC receoverBlock().
+ (Tsz Wo (Nicholas), SZE via rangadi)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DFSClient.java?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DFSClient.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DFSClient.java
Tue Jul 8 12:42:30 2008
@@ -2178,10 +2178,12 @@
// by stamping appropriate generation stamps.
//
Block newBlock = null;
- ClientDatanodeProtocol datanodeRPC = null;
+ ClientDatanodeProtocol primary = null;
try {
- datanodeRPC = createClientDatanodeProtocolProxy(newnodes[0], conf);
- newBlock = datanodeRPC.recoverBlock(block, newnodes);
+ // Pick the "least" datanode as the primary datanode to avoid
deadlock.
+ primary = createClientDatanodeProtocolProxy(
+ Collections.min(Arrays.asList(newnodes)), conf);
+ newBlock = primary.recoverBlock(block, newnodes);
} catch (IOException e) {
recoveryErrorCount++;
if (recoveryErrorCount > maxRecoveryErrorCount) {
@@ -2201,7 +2203,7 @@
" times. Will retry...");
return true; // sleep when we return from here
} finally {
- RPC.stopProxy(datanodeRPC);
+ RPC.stopProxy(primary);
}
recoveryErrorCount = 0; // block recovery successful
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
Tue Jul 8 12:42:30 2008
@@ -3089,7 +3089,7 @@
Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
Daemon d = new Daemon(threadGroup, new Runnable() {
public void run() {
- LeaseManager.recoverBlocks(blocks, targets, namenode, getConf());
+ LeaseManager.recoverBlocks(blocks, targets, DataNode.this, namenode,
getConf());
}
});
d.start();
@@ -3127,7 +3127,7 @@
public Block recoverBlock(Block block, DatanodeInfo[] targets
) throws IOException {
LOG.info("Client invoking recoverBlock for block " + block);
- return LeaseManager.recoverBlock(block, targets, namenode,
+ return LeaseManager.recoverBlock(block, targets, this, namenode,
getConf(), false);
}
}
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeID.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeID.java?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeID.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeID.java
Tue Jul 8 12:42:30 2008
@@ -31,7 +31,7 @@
* which it currently represents.
*
*/
-public class DatanodeID implements WritableComparable {
+public class DatanodeID implements WritableComparable<DatanodeID> {
static final DatanodeID[] EMPTY_ARRAY = {};
protected String name; /// hostname:portNumber
@@ -158,11 +158,11 @@
/** Comparable.
* Basis of compare is the String name (host:portNumber) only.
- * @param o
+ * @param that
* @return as specified by Comparable.
*/
- public int compareTo(Object o) {
- return name.compareTo(((DatanodeID)o).getName());
+ public int compareTo(DatanodeID that) {
+ return name.compareTo(that.getName());
}
/////////////////////////////////////////////////
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSImage.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSImage.java?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSImage.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSImage.java
Tue Jul 8 12:42:30 2008
@@ -47,7 +47,7 @@
import org.apache.hadoop.dfs.FSConstants.StartupOption;
import org.apache.hadoop.dfs.FSConstants.NodeType;
import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
/**
@@ -1250,35 +1250,8 @@
* DatanodeImage is used to store persistent information
* about datanodes into the fsImage.
*/
- static class DatanodeImage implements WritableComparable {
- DatanodeDescriptor node;
-
- DatanodeImage() {
- node = new DatanodeDescriptor();
- }
-
- DatanodeImage(DatanodeDescriptor from) {
- node = from;
- }
-
- /**
- * Returns the underlying Datanode Descriptor
- */
- DatanodeDescriptor getDatanodeDescriptor() {
- return node;
- }
-
- public int compareTo(Object o) {
- return node.compareTo(o);
- }
-
- public boolean equals(Object o) {
- return node.equals(o);
- }
-
- public int hashCode() {
- return node.hashCode();
- }
+ static class DatanodeImage implements Writable {
+ DatanodeDescriptor node = new DatanodeDescriptor();
/////////////////////////////////////////////////
// Writable
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java
Tue Jul 8 12:42:30 2008
@@ -406,10 +406,10 @@
* This method is invoked by the primary datanode.
*/
static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
- DatanodeProtocol namenode, Configuration conf) {
+ DataNode primary, DatanodeProtocol namenode, Configuration conf) {
for(int i = 0; i < blocks.length; i++) {
try {
- recoverBlock(blocks[i], targets[i], namenode, conf, true);
+ recoverBlock(blocks[i], targets[i], primary, namenode, conf, true);
} catch (IOException e) {
LOG.warn("recoverBlocks, i=" + i, e);
}
@@ -418,7 +418,7 @@
/** Recover a block */
static Block recoverBlock(Block block, DatanodeID[] datanodeids,
- DatanodeProtocol namenode, Configuration conf,
+ DataNode primary, DatanodeProtocol namenode, Configuration conf,
boolean closeFile) throws IOException {
// If the block is already being recovered, then skip recovering it.
@@ -447,8 +447,8 @@
//check generation stamps
for(DatanodeID id : datanodeids) {
try {
- InterDatanodeProtocol datanode
- = DataNode.createInterDataNodeProtocolProxy(id, conf);
+ InterDatanodeProtocol datanode = primary.dnRegistration.equals(id)?
+ primary: DataNode.createInterDataNodeProtocolProxy(id, conf);
BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
if (info != null && info.getGenerationStamp() >=
block.generationStamp) {
syncList.add(new BlockRecord(id, datanode, new Block(info)));
Modified:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=674926&r1=674925&r2=674926&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestFileCreation.java
(original)
+++
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestFileCreation.java
Tue Jul 8 12:42:30 2008
@@ -37,7 +37,7 @@
static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/";
{
- ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
}
@@ -686,4 +686,99 @@
System.out.println("testLeaseExpireHardLimit successful");
}
+
+ /** Test lease recovery Triggered by DFSClient. */
+ public void testClientTriggeredLeaseRecovery() throws Exception {
+ final int REPLICATION = 3;
+ Configuration conf = new Configuration();
+ conf.setInt("dfs.datanode.handler.count", 1);
+ conf.setInt("dfs.replication", REPLICATION);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
+
+ try {
+ final FileSystem fs = cluster.getFileSystem();
+ final Path dir = new Path("/wrwelkj");
+
+ SlowWriter[] slowwriters = new SlowWriter[10];
+ for(int i = 0; i < slowwriters.length; i++) {
+ slowwriters[i] = new SlowWriter(fs, new Path(dir, "file" + i));
+ }
+
+ try {
+ for(int i = 0; i < slowwriters.length; i++) {
+ slowwriters[i].start();
+ }
+
+ //stop a datanode, it should have least recover.
+ cluster.stopDataNode(new Random().nextInt(REPLICATION));
+
+ //let the slow writer writes a few more seconds
+ System.out.println("Wait a few seconds");
+ Thread.sleep(5000);
+ }
+ finally {
+ for(int i = 0; i < slowwriters.length; i++) {
+ if (slowwriters[i] != null) {
+ slowwriters[i].interrupt();
+ }
+ }
+ for(int i = 0; i < slowwriters.length; i++) {
+ if (slowwriters[i] != null) {
+ slowwriters[i].join();
+ }
+ }
+ }
+
+ //Verify the file
+ System.out.println("Verify the file");
+ for(int i = 0; i < slowwriters.length; i++) {
+ System.out.println(slowwriters[i].filepath + ": length="
+ + fs.getFileStatus(slowwriters[i].filepath).getLen());
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(slowwriters[i].filepath);
+ for(int j = 0, x; (x = in.read()) != -1; j++) {
+ assertEquals(j, x);
+ }
+ }
+ finally {
+ IOUtils.closeStream(in);
+ }
+ }
+ } finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
+
+ static class SlowWriter extends Thread {
+ final FileSystem fs;
+ final Path filepath;
+
+ SlowWriter(FileSystem fs, Path filepath) {
+ super(SlowWriter.class.getSimpleName() + ":" + filepath);
+ this.fs = fs;
+ this.filepath = filepath;
+ }
+
+ public void run() {
+ FSDataOutputStream out = null;
+ int i = 0;
+ try {
+ out = fs.create(filepath);
+ for(; ; i++) {
+ System.out.println(getName() + " writes " + i);
+ out.write(i);
+ out.sync();
+ sleep(100);
+ }
+ }
+ catch(Exception e) {
+ System.out.println(getName() + " dies: e=" + e);
+ }
+ finally {
+ System.out.println(getName() + ": i=" + i);
+ IOUtils.closeStream(out);
+ }
+ }
+ }
}