Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 73b5a44b0 -> 242c7f1fe
HDFS-9874. Long living DataXceiver threads cause volume shutdown to block.
Contributed by Rushabh Shah.
(cherry picked from commit 63c966a3fbeb675959fc4101e65de9f57aecd17d)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/242c7f1f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/242c7f1f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/242c7f1f
Branch: refs/heads/branch-2.8
Commit: 242c7f1fee664b4d609a6c72e899f10816430f65
Parents: 73b5a44
Author: Kihwal Lee <[email protected]>
Authored: Fri Mar 18 10:33:13 2016 -0500
Committer: Kihwal Lee <[email protected]>
Committed: Fri Mar 18 10:33:13 2016 -0500
----------------------------------------------------------------------
.../hdfs/server/datanode/ReplicaInPipeline.java | 7 +++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 13 ++++
.../datanode/fsdataset/impl/FsVolumeImpl.java | 6 ++
.../fsdataset/impl/TestFsDatasetImpl.java | 66 ++++++++++++++++++++
4 files changed, 92 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index d9406f0..5caca15 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -183,6 +183,13 @@ public class ReplicaInPipeline extends ReplicaInfo
this.writer = writer;
}
+ public void interruptThread() {
+ if (writer != null && writer != Thread.currentThread()
+ && writer.isAlive()) {
+ this.writer.interrupt();
+ }
+ }
+
@Override // Object
public boolean equals(Object o) {
return super.equals(o);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 163c8d0..2f16fc5 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3152,5 +3152,18 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
evictLazyPersistBlocks(bytesNeeded);
return cacheManager.reserve(bytesNeeded) > 0;
}
+
+ synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+ for (String blockPoolId : volumeMap.getBlockPoolList()) {
+ Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+ for (ReplicaInfo replicaInfo : replicas) {
+ if (replicaInfo instanceof ReplicaInPipeline
+ && replicaInfo.getVolume().equals(volume)) {
+ ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline)
replicaInfo;
+ replicaInPipeline.interruptThread();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e02c293..ca7610d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -239,6 +239,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
Preconditions.checkState(reference.getReferenceCount() > 0);
}
+ @VisibleForTesting
+ int getReferenceCount() {
+ return this.reference.getReferenceCount();
+ }
+
/**
* Close this volume.
* @throws IOException if the volume is closed.
@@ -246,6 +251,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
void setClosed() throws IOException {
try {
this.reference.setClosed();
+ dataset.stopAllDataxceiverThreads(this);
} catch (ClosedChannelException e) {
throw new IOException("The volume has already closed.", e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index ec526d3..67808bf 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -21,14 +21,19 @@ import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -50,6 +55,7 @@ import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -525,4 +531,64 @@ public class TestFsDatasetImpl {
LOG.info("Volumes removed");
brReceivedLatch.await();
}
+
+ /**
+ * Tests stopping all the active DataXceiver thread on volume failure event.
+ * @throws Exception
+ */
+ @Test
+ public void testCleanShutdownOfVolume() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration config = new HdfsConfiguration();
+ config.setLong(
+ DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+ config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
1);
+
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ Path filePath = new Path("test.dat");
+ // Create a file and keep the output stream unclosed.
+ FSDataOutputStream out = fs.create(filePath, (short) 1);
+ out.write(1);
+ out.hflush();
+
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+ FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
+ block);
+ File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
+ .getBlockPoolId());
+
+ if (finalizedDir.exists()) {
+ // Remove write and execute access so that checkDiskErrorThread detects
+ // this volume is bad.
+ finalizedDir.setExecutable(false);
+ finalizedDir.setWritable(false);
+ }
+ Assert.assertTrue("Reference count for the volume should be greater "
+ + "than 0", volume.getReferenceCount() > 0);
+ // Invoke the synchronous checkDiskError method
+ dataNode.getFSDataset().checkDataDir();
+ // Sleep for 1 second so that datanode can interrupt and cluster clean up
+ Thread.sleep(1000);
+ assertEquals("There are active threads still referencing volume: "
+ + volume.getBasePath(), 0, volume.getReferenceCount());
+ LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+ DatanodeInfo info = lb.getLocations()[0];
+
+ try {
+ out.close();
+ Assert.fail("This is not a valid code path. "
+ + "out.close should have thrown an exception.");
+ } catch (IOException ioe) {
+ Assert.assertTrue(ioe.getMessage().contains(info.toString()));
+ }
+ finalizedDir.setWritable(true);
+ finalizedDir.setExecutable(true);
+ } finally {
+ cluster.shutdown();
+ }
+ }
}