Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 bb5749b1e -> a66d2a2a8


HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. 
Contributed by Rushabh Shah.

(cherry picked from commit 63c966a3fbeb675959fc4101e65de9f57aecd17d)
(cherry picked from commit 242c7f1fee664b4d609a6c72e899f10816430f65)

Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a66d2a2a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a66d2a2a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a66d2a2a

Branch: refs/heads/branch-2.7
Commit: a66d2a2a85e0db9ef164dfcf3a50d3970b51f175
Parents: bb5749b
Author: Kihwal Lee <[email protected]>
Authored: Fri Mar 18 10:38:33 2016 -0500
Committer: Kihwal Lee <[email protected]>
Committed: Fri Mar 18 10:38:33 2016 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hdfs/server/datanode/ReplicaInPipeline.java |  7 +++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 13 ++++
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  6 ++
 .../fsdataset/impl/TestFsDatasetImpl.java       | 66 ++++++++++++++++++++
 5 files changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a66d2a2a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0ed2127..f01e697 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -146,6 +146,9 @@ Release 2.7.3 - UNRELEASED
     HDFS-9904. testCheckpointCancellationDuringUpload occasionally fails.
     (Lin Yiqun via kihwal)
 
+    HDFS-9874. Long living DataXceiver threads cause volume shutdown to block.
+    (Rushabh Shah via kihwal)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a66d2a2a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index cc55f85..27c46e3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -173,6 +173,13 @@ public class ReplicaInPipeline extends ReplicaInfo
     this.writer = writer;
   }
   
+  public void interruptThread() {
+    if (writer != null && writer != Thread.currentThread() 
+        && writer.isAlive()) {
+      this.writer.interrupt();
+    }
+  }
+
   @Override  // Object
   public boolean equals(Object o) {
     return super.equals(o);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a66d2a2a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f4e10f1..4673029 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3048,5 +3048,18 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       s.add(blockId);
     }
   }
+
+  synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+    for (String blockPoolId : volumeMap.getBlockPoolList()) {
+      Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+      for (ReplicaInfo replicaInfo : replicas) {
+        if (replicaInfo instanceof ReplicaInPipeline
+            && replicaInfo.getVolume().equals(volume)) {
+          ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) 
replicaInfo;
+          replicaInPipeline.interruptThread();
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a66d2a2a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 7b3b6cc..527d471 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -229,6 +229,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
     Preconditions.checkState(reference.getReferenceCount() > 0);
   }
 
+  @VisibleForTesting
+  int getReferenceCount() {
+    return this.reference.getReferenceCount();
+  }
+
   /**
    * Close this volume and wait all other threads to release the reference 
count
    * on this volume.
@@ -237,6 +242,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   void closeAndWait() throws IOException {
     try {
       this.reference.setClosed();
+      dataset.stopAllDataxceiverThreads(this);
     } catch (ClosedChannelException e) {
       throw new IOException("The volume has already closed.", e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a66d2a2a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 536725f..b3827c7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -20,14 +20,19 @@ package 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.collect.Lists;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -48,6 +53,7 @@ import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -437,4 +443,64 @@ public class TestFsDatasetImpl {
     assertSame(replica,
         BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
   }
+
+  /**
+   * Tests stopping all the active DataXceiver thread on volume failure event.
+   * @throws Exception
+   */
+  @Test
+  public void testCleanShutdownOfVolume() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration config = new HdfsConfiguration();
+      config.setLong(
+          DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+      config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 
1);
+
+      cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      Path filePath = new Path("test.dat");
+      // Create a file and keep the output stream unclosed.
+      FSDataOutputStream out = fs.create(filePath, (short) 1);
+      out.write(1);
+      out.hflush();
+
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+      FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
+          block);
+      File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
+          .getBlockPoolId());
+
+      if (finalizedDir.exists()) {
+        // Remove write and execute access so that checkDiskErrorThread detects
+        // this volume is bad.
+        finalizedDir.setExecutable(false);
+        finalizedDir.setWritable(false);
+      }
+      Assert.assertTrue("Reference count for the volume should be greater "
+          + "than 0", volume.getReferenceCount() > 0);
+      // Invoke the synchronous checkDiskError method
+      dataNode.getFSDataset().checkDataDir();
+      // Sleep for 1 second so that datanode can interrupt and cluster clean up
+      Thread.sleep(1000);
+      assertEquals("There are active threads still referencing volume: "
+          + volume.getBasePath(), 0, volume.getReferenceCount());
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+      DatanodeInfo info = lb.getLocations()[0];
+
+      try {
+        out.close();
+        Assert.fail("This is not a valid code path. "
+            + "out.close should have thrown an exception.");
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe.getMessage().contains(info.toString()));
+      }
+      finalizedDir.setWritable(true);
+      finalizedDir.setExecutable(true);
+    } finally {
+    cluster.shutdown();
+    }
+  }
 }

Reply via email to