HDFS-8496. Calling stopWriter() with FSDatasetImpl lock held may block other 
threads (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6b1a818
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6b1a818
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6b1a818

Branch: refs/heads/HDFS-7240
Commit: f6b1a818124cc42688c4c5acaf537d96cf00e43b
Parents: f65f5b1
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Mon Apr 4 18:00:26 2016 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Mon Apr 4 18:02:15 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/ReplicaInPipeline.java |  54 ++++---
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 145 +++++++++++++------
 .../datanode/fsdataset/impl/ReplicaMap.java     |   2 +-
 .../hdfs/server/datanode/TestBlockRecovery.java | 137 ++++++++++++++++--
 4 files changed, 257 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b1a818/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 5caca15..7326846 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -22,6 +22,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -44,7 +45,7 @@ public class ReplicaInPipeline extends ReplicaInfo
   private long bytesAcked;
   private long bytesOnDisk;
   private byte[] lastChecksum;  
-  private Thread writer;
+  private AtomicReference<Thread> writer = new AtomicReference<Thread>();
 
   /**
    * Bytes reserved for this replica on the containing volume.
@@ -97,7 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     super( blockId, len, genStamp, vol, dir);
     this.bytesAcked = len;
     this.bytesOnDisk = len;
-    this.writer = writer;
+    this.writer.set(writer);
     this.bytesReserved = bytesToReserve;
     this.originalBytesReserved = bytesToReserve;
   }
@@ -110,7 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     super(from);
     this.bytesAcked = from.getBytesAcked();
     this.bytesOnDisk = from.getBytesOnDisk();
-    this.writer = from.writer;
+    this.writer.set(from.writer.get());
     this.bytesReserved = from.bytesReserved;
     this.originalBytesReserved = from.originalBytesReserved;
   }
@@ -175,18 +176,11 @@ public class ReplicaInPipeline extends ReplicaInfo
     return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
   }
 
-  /**
-   * Set the thread that is writing to this replica
-   * @param writer a thread writing to this replica
-   */
-  public void setWriter(Thread writer) {
-    this.writer = writer;
-  }
-  
   public void interruptThread() {
-    if (writer != null && writer != Thread.currentThread() 
-        && writer.isAlive()) {
-      this.writer.interrupt();
+    Thread thread = writer.get();
+    if (thread != null && thread != Thread.currentThread() 
+        && thread.isAlive()) {
+      thread.interrupt();
     }
   }
 
@@ -196,17 +190,35 @@ public class ReplicaInPipeline extends ReplicaInfo
   }
   
   /**
+   * Attempt to set the writer to a new value.
+   */
+  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
+    return writer.compareAndSet(prevWriter, newWriter);
+  }
+
+  /**
    * Interrupt the writing thread and wait until it dies
    * @throws IOException the waiting is interrupted
    */
   public void stopWriter(long xceiverStopTimeout) throws IOException {
-    if (writer != null && writer != Thread.currentThread() && 
writer.isAlive()) {
-      writer.interrupt();
+    while (true) {
+      Thread thread = writer.get();
+      if ((thread == null) || (thread == Thread.currentThread()) ||
+          (!thread.isAlive())) {
+        if (writer.compareAndSet(thread, null) == true) {
+          return; // Done
+        }
+        // The writer changed.  Go back to the start of the loop and attempt to
+        // stop the new writer.
+        continue;
+      }
+      thread.interrupt();
       try {
-        writer.join(xceiverStopTimeout);
-        if (writer.isAlive()) {
-          final String msg = "Join on writer thread " + writer + " timed out";
-          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer));
+        thread.join(xceiverStopTimeout);
+        if (thread.isAlive()) {
+          // Our thread join timed out.
+          final String msg = "Join on writer thread " + thread + " timed out";
+          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
           throw new IOException(msg);
         }
       } catch (InterruptedException e) {
@@ -214,7 +226,7 @@ public class ReplicaInPipeline extends ReplicaInfo
       }
     }
   }
-  
+
   @Override  // Object
   public int hashCode() {
     return super.hashCode();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b1a818/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index d6a0df6..240345c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1207,8 +1207,20 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     return newReplicaInfo;
   }
 
+  private static class MustStopExistingWriter extends Exception {
+    private final ReplicaInPipeline rip;
+
+    MustStopExistingWriter(ReplicaInPipeline rip) {
+      this.rip = rip;
+    }
+
+    ReplicaInPipeline getReplica() {
+      return rip;
+    }
+  }
+
   private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
-      long expectedBlockLen) throws IOException {
+      long expectedBlockLen) throws IOException, MustStopExistingWriter {
     ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), 
b.getBlockId());
     
     // check state
@@ -1232,9 +1244,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     long replicaLen = replicaInfo.getNumBytes();
     if (replicaInfo.getState() == ReplicaState.RBW) {
       ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
-      // kill the previous writer
-      rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
-      rbw.setWriter(Thread.currentThread());
+      if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
+        throw new MustStopExistingWriter(rbw);
+      }
       // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the 
same
       if (replicaLen != rbw.getBytesOnDisk() 
           || replicaLen != rbw.getBytesAcked()) {
@@ -1260,39 +1272,55 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
     LOG.info("Recover failed append to " + b);
 
-    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+    while (true) {
+      try {
+        synchronized (this) {
+          ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
-    FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
-    ReplicaBeingWritten replica;
-    try {
-      // change the replica's state/gs etc.
-      if (replicaInfo.getState() == ReplicaState.FINALIZED) {
-        replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
-                         newGS, b.getNumBytes());
-      } else { //RBW
-        bumpReplicaGS(replicaInfo, newGS);
-        replica = (ReplicaBeingWritten) replicaInfo;
+          FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
+          ReplicaBeingWritten replica;
+          try {
+            // change the replica's state/gs etc.
+            if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+              replica = append(b.getBlockPoolId(), (FinalizedReplica) 
replicaInfo,
+                               newGS, b.getNumBytes());
+            } else { //RBW
+              bumpReplicaGS(replicaInfo, newGS);
+              replica = (ReplicaBeingWritten) replicaInfo;
+            }
+          } catch (IOException e) {
+            IOUtils.cleanup(null, ref);
+            throw e;
+          }
+          return new ReplicaHandler(replica, ref);
+        }
+      } catch (MustStopExistingWriter e) {
+        
e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
       }
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
     }
-    return new ReplicaHandler(replica, ref);
   }
 
   @Override // FsDatasetSpi
   public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
     LOG.info("Recover failed close " + b);
-    // check replica's state
-    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
-    // bump the replica's GS
-    bumpReplicaGS(replicaInfo, newGS);
-    // finalize the replica if RBW
-    if (replicaInfo.getState() == ReplicaState.RBW) {
-      finalizeReplica(b.getBlockPoolId(), replicaInfo);
+    while (true) {
+      try {
+        synchronized (this) {
+          // check replica's state
+          ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+          // bump the replica's GS
+          bumpReplicaGS(replicaInfo, newGS);
+          // finalize the replica if RBW
+          if (replicaInfo.getState() == ReplicaState.RBW) {
+            finalizeReplica(b.getBlockPoolId(), replicaInfo);
+          }
+          return replicaInfo;
+        }
+      } catch (MustStopExistingWriter e) {
+        
e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
+      }
     }
-    return replicaInfo;
   }
   
   /**
@@ -1384,26 +1412,37 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaHandler recoverRbw(
+  public ReplicaHandler recoverRbw(
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
     LOG.info("Recover RBW replica " + b);
 
-    ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), 
b.getBlockId());
-    
-    // check the replica's state
-    if (replicaInfo.getState() != ReplicaState.RBW) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
+    while (true) {
+      try {
+        synchronized (this) {
+          ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), 
b.getBlockId());
+          
+          // check the replica's state
+          if (replicaInfo.getState() != ReplicaState.RBW) {
+            throw new ReplicaNotFoundException(
+                ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
+          }
+          ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+          if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
+            throw new MustStopExistingWriter(rbw);
+          }
+          LOG.info("Recovering " + rbw);
+          return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
+        }
+      } catch (MustStopExistingWriter e) {
+        
e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
+      }
     }
-    ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
-    
-    LOG.info("Recovering " + rbw);
-
-    // Stop the previous writer
-    rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
-    rbw.setWriter(Thread.currentThread());
+  }
 
+  private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
+      ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
+      throws IOException {
     // check generation stamp
     long replicaGenerationStamp = rbw.getGenerationStamp();
     if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1419,7 +1458,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     long numBytes = rbw.getNumBytes();
     if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
       throw new ReplicaNotFoundException("Unmatched length replica " + 
-          replicaInfo + ": BytesAcked = " + bytesAcked + 
+          rbw + ": BytesAcked = " + bytesAcked + 
           " BytesRcvd = " + numBytes + " are not in the range of [" + 
           minBytesRcvd + ", " + maxBytesRcvd + "].");
     }
@@ -2351,8 +2390,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaRecoveryInfo initReplicaRecovery(
-      RecoveringBlock rBlock) throws IOException {
+  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+      throws IOException {
     return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
         rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
         datanode.getDnConf().getXceiverStopTimeout());
@@ -2361,6 +2400,20 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */
   static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
       Block block, long recoveryId, long xceiverStopTimeout) throws 
IOException {
+    while (true) {
+      try {
+        synchronized (map.getMutex()) {
+          return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
+        }
+      } catch (MustStopExistingWriter e) {
+        e.getReplica().stopWriter(xceiverStopTimeout);
+      }
+    }
+  }
+
+  static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap 
map,
+      Block block, long recoveryId)
+          throws IOException, MustStopExistingWriter {
     final ReplicaInfo replica = map.get(bpid, block.getBlockId());
     LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
         + ", replica=" + replica);
@@ -2373,7 +2426,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     //stop writer if there is any
     if (replica instanceof ReplicaInPipeline) {
       final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
-      rip.stopWriter(xceiverStopTimeout);
+      if (!rip.attemptToSetWriter(null, Thread.currentThread())) {
+        throw new MustStopExistingWriter(rip);
+      }
 
       //check replica bytes on disk.
       if (rip.getBytesOnDisk() < rip.getVisibleLength()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b1a818/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 34c9f2e..192702e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -233,7 +233,7 @@ class ReplicaMap {
    * Give access to mutex used for synchronizing ReplicasMap
    * @return object used as lock
    */
-  Object getMutext() {
+  Object getMutex() {
     return mutex;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b1a818/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 7e2bc0a..751089f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -42,10 +42,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.Iterators;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -92,7 +94,9 @@ import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -124,6 +128,9 @@ public class TestBlockRecovery {
   private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
       BLOCK_ID, BLOCK_LEN, GEN_STAMP);
 
+  @Rule
+  public TestName currentTestName = new TestName();
+
   private static final int CELL_SIZE =
       StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
   private static final int bytesPerChecksum = 512;
@@ -153,6 +160,9 @@ public class TestBlockRecovery {
     GenericTestUtils.setLogLevel(LOG, Level.ALL);
   }
 
+  private final long
+      TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
+
   /**
    * Starts an instance of DataNode
    * @throws IOException
@@ -165,6 +175,12 @@ public class TestBlockRecovery {
     conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
+    if (currentTestName.getMethodName().equals(
+        "testInitReplicaRecoveryDoesNotHogLock")) {
+      // This test requires a very long value for the xceiver stop timeout.
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
+          TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
+    }
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
     FileSystem.setDefaultUri(conf,
         "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
@@ -297,7 +313,7 @@ public class TestBlockRecovery {
    * Two replicas are in Finalized state
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testFinalizedReplicas () throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -336,7 +352,7 @@ public class TestBlockRecovery {
    * One replica is Finalized and another is RBW. 
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testFinalizedRbwReplicas() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -378,7 +394,7 @@ public class TestBlockRecovery {
    * One replica is Finalized and another is RWR. 
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testFinalizedRwrReplicas() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -420,7 +436,7 @@ public class TestBlockRecovery {
    * Two replicas are RBW.
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testRBWReplicas() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -444,7 +460,7 @@ public class TestBlockRecovery {
    * One replica is RBW and another is RWR. 
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testRBW_RWRReplicas() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -468,7 +484,7 @@ public class TestBlockRecovery {
    * Two replicas are RWR.
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testRWRReplicas() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -504,7 +520,7 @@ public class TestBlockRecovery {
    * @throws IOException
    *           in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testRecoveryInProgressException()
     throws IOException, InterruptedException {
     if(LOG.isDebugEnabled()) {
@@ -529,7 +545,7 @@ public class TestBlockRecovery {
    * @throws IOException
    *           in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testErrorReplicas() throws IOException, InterruptedException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -556,7 +572,7 @@ public class TestBlockRecovery {
    *
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testZeroLenReplicas() throws IOException, InterruptedException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -596,7 +612,7 @@ public class TestBlockRecovery {
    *
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testFailedReplicaUpdate() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -618,7 +634,7 @@ public class TestBlockRecovery {
    *
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testNoReplicaUnderRecovery() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -643,7 +659,7 @@ public class TestBlockRecovery {
    *
    * @throws IOException in case of an error
    */
-  @Test
+  @Test(timeout=60000)
   public void testNotMatchedReplicaID() throws IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -744,7 +760,7 @@ public class TestBlockRecovery {
    * throw an exception.
    * @throws Exception
    */
-  @Test
+  @Test(timeout=60000)
   public void testRURReplicas() throws Exception {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
@@ -775,7 +791,7 @@ public class TestBlockRecovery {
     }
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testSafeLength() throws Exception {
     // hard coded policy to work with hard coded test suite
     ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager
@@ -799,4 +815,97 @@ public class TestBlockRecovery {
           recoveryTask.getSafeLength(syncList));
     }
   }
+
+  /**
+   * Test that initReplicaRecovery does not hold the lock for an unreasonable
+   * amount of time if a writer is taking a long time to stop.
+   */
+  @Test(timeout=60000)
+  public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Running " + GenericTestUtils.getMethodName());
+    }
+    // We need a long value for the data xceiver stop timeout.
+    // Otherwise the timeout will trigger, and we will not have tested that
+    // thread join was done locklessly.
+    Assert.assertEquals(
+        TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
+        dn.getDnConf().getXceiverStopTimeout());
+    final Semaphore progressParent = new Semaphore(0);
+    final Semaphore terminateSlowWorker = new Semaphore(0);
+    final AtomicBoolean failure = new AtomicBoolean(false);
+    Collection<RecoveringBlock> recoveringBlocks =
+        initRecoveringBlocks();
+    final RecoveringBlock recoveringBlock =
+        Iterators.get(recoveringBlocks.iterator(), 0);
+    final ExtendedBlock block = recoveringBlock.getBlock();
+    Thread slowWorker = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Register this thread as the writer for the recoveringBlock.
+          LOG.debug("slowWorker creating rbw");
+          ReplicaHandler replicaHandler =
+              spyDN.data.createRbw(StorageType.DISK, block, false);
+          replicaHandler.close();
+          LOG.debug("slowWorker created rbw");
+          // Tell the parent thread to start progressing.
+          progressParent.release();
+          while (true) {
+            try {
+              terminateSlowWorker.acquire();
+              break;
+            } catch (InterruptedException e) {
+              // Ignore interrupted exceptions so that the waitingWorker thread
+              // will have to wait for us.
+            }
+          }
+          LOG.debug("slowWorker exiting");
+        } catch (Throwable t) {
+          LOG.error("slowWorker got exception", t);
+          failure.set(true);
+        }
+      }
+    });
+    // Start the slow worker thread and wait for it to take ownership of the
+    // ReplicaInPipeline
+    slowWorker.start();
+    while (true) {
+      try {
+        progressParent.acquire();
+        break;
+      } catch (InterruptedException e) {
+        // Ignore interrupted exceptions
+      }
+    }
+
+    // Start a worker thread which will wait for the slow worker thread.
+    Thread waitingWorker = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Attempt to terminate the other worker thread and take ownership
+          // of the ReplicaInPipeline.
+          LOG.debug("waitingWorker initiating recovery");
+          spyDN.initReplicaRecovery(recoveringBlock);
+          LOG.debug("waitingWorker initiated recovery");
+        } catch (Throwable t) {
+          GenericTestUtils.assertExceptionContains("meta does not exist", t);
+        }
+      }
+    });
+    waitingWorker.start();
+
+    // Do an operation that requires the lock.  This should not be blocked
+    // by the replica recovery in progress.
+    spyDN.getFSDataset().getReplicaString(
+        recoveringBlock.getBlock().getBlockPoolId(),
+        recoveringBlock.getBlock().getBlockId());
+
+    // Wait for the two worker threads to exit normally.
+    terminateSlowWorker.release();
+    slowWorker.join();
+    waitingWorker.join();
+    Assert.assertFalse("The slowWriter thread failed.", failure.get());
+  }
 }

Reply via email to