Author: umamahesh
Date: Sun Jul 8 17:56:41 2012
New Revision: 1358798
URL: http://svn.apache.org/viewvc?rev=1358798&view=rev
Log:
Merge from trunk r:1358794 HDFS-3541. Deadlock between recovery, xceiver and
packet responder. Contributed by Vinay.
Submitted by:Vivay
Reviewed by: Uma Maheswara Rao G, Kihwal Lee, Aaron
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props
changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project:r1358794
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1358794
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1358798&r1=1358797&r2=1358798&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Sun Jul 8 17:56:41 2012
@@ -262,6 +262,8 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3581. FSPermissionChecker#checkPermission sticky bit check
missing range check. (eli)
+ HDFS-3541. Deadlock between recovery, xceiver and packet responder (Vinay
via umamahesh)
+
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1358794
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1358798&r1=1358797&r2=1358798&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Sun Jul 8 17:56:41 2012
@@ -844,6 +844,7 @@ class BlockReceiver implements Closeable
try {
responder.join();
} catch (InterruptedException e) {
+ responder.interrupt();
throw new IOException("Interrupted receiveBlock");
}
responder = null;
@@ -1018,6 +1019,7 @@ class BlockReceiver implements Closeable
wait();
} catch (InterruptedException e) {
running = false;
+ Thread.currentThread().interrupt();
}
}
if(LOG.isDebugEnabled()) {
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1358798&r1=1358797&r2=1358798&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
Sun Jul 8 17:56:41 2012
@@ -838,6 +838,10 @@ class FsDatasetImpl implements FsDataset
*/
@Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+ if (Thread.interrupted()) {
+ // Don't allow data modifications from interrupted threads
+ throw new IOException("Cannot finalize block from Interrupted Thread");
+ }
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1358798&r1=1358797&r2=1358798&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Sun Jul 8 17:56:41 2012
@@ -38,21 +38,27 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -561,4 +567,68 @@ public class TestBlockRecovery {
streams.close();
}
}
+
+ /**
+ * Test to verify the race between finalizeBlock and Lease recovery
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 20000)
+ public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws
Exception {
+ tearDown();// Stop the Mocked DN started in startup()
+
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleSingleNN(8020, 50070))
+ .numDataNodes(1).build();
+ try {
+ cluster.waitClusterUp();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path path = new Path("/test");
+ FSDataOutputStream out = fs.create(path);
+ out.writeBytes("data");
+ out.hsync();
+
+ List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
+ final LocatedBlock block = blocks.get(0);
+ final DataNode dataNode = cluster.getDataNodes().get(0);
+
+ final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
+ Thread recoveryThread = new Thread() {
+ public void run() {
+ try {
+ DatanodeInfo[] locations = block.getLocations();
+ final RecoveringBlock recoveringBlock = new RecoveringBlock(
+ block.getBlock(), locations, block.getBlock()
+ .getGenerationStamp() + 1);
+ synchronized (dataNode.data) {
+ Thread.sleep(2000);
+ dataNode.initReplicaRecovery(recoveringBlock);
+ }
+ } catch (Exception e) {
+ recoveryInitResult.set(false);
+ }
+ }
+ };
+ recoveryThread.start();
+ try {
+ out.close();
+ } catch (IOException e) {
+ Assert.assertTrue("Writing should fail",
+ e.getMessage().contains("are bad. Aborting..."));
+ } finally {
+ recoveryThread.join();
+ }
+ Assert.assertTrue("Recovery should be initiated successfully",
+ recoveryInitResult.get());
+
+ dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
+ .getGenerationStamp() + 1, block.getBlockSize());
+ } finally {
+ if (null != cluster) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+ }
}