Repository: hadoop
Updated Branches:
refs/heads/branch-2 3584cbc35 -> b5e6ad457
HDFS-10267. Extra "synchronized" on FsDatasetImpl#recoverAppend and
FsDatasetImpl#recoverClose
(cherry picked from commit 4bd7cbc29d142fc56324156333b9a8a7d7b68042)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5e6ad45
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5e6ad45
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5e6ad45
Branch: refs/heads/branch-2
Commit: b5e6ad457f39dd5f2176de700612e2757b3c1812
Parents: 3584cbc
Author: Colin Patrick Mccabe <[email protected]>
Authored: Wed Apr 6 12:36:54 2016 -0700
Committer: Colin Patrick Mccabe <[email protected]>
Committed: Wed Apr 6 21:08:42 2016 -0700
----------------------------------------------------------------------
.../datanode/fsdataset/impl/FsDatasetImpl.java | 4 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 234 ++++++++++++++-----
2 files changed, 180 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e6ad45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e728ad7..f9620e0 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1273,7 +1273,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
@Override // FsDatasetSpi
- public synchronized ReplicaHandler recoverAppend(
+ public ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
LOG.info("Recover failed append to " + b);
@@ -1306,7 +1306,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
@Override // FsDatasetSpi
- public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
+ public Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b);
while (true) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e6ad45/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 41807e7..513e9a8 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -42,8 +42,10 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log;
@@ -84,6 +86,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
@@ -130,7 +133,7 @@ public class TestBlockRecovery {
}
private final long
- TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
+ TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
/**
* Starts an instance of DataNode
@@ -143,11 +146,10 @@ public class TestBlockRecovery {
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
- if (currentTestName.getMethodName().equals(
- "testInitReplicaRecoveryDoesNotHogLock")) {
+ if (currentTestName.getMethodName().contains("DoesNotHoldLock")) {
// This test requires a very long value for the xceiver stop timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
- TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
+ TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS);
}
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf,
@@ -759,96 +761,216 @@ public class TestBlockRecovery {
}
}
+ private static class TestStopWorkerSemaphore {
+ final Semaphore sem;
+
+ final AtomicBoolean gotInterruption = new AtomicBoolean(false);
+
+ TestStopWorkerSemaphore() {
+ this.sem = new Semaphore(0);
+ }
+
+ /**
+ * Attempt to acquire a sempahore within a given timeout.
+ *
+ * This is useful for unit tests where we need to ignore
InterruptedException
+ * when attempting to take a semaphore, but still want to honor the overall
+ * test timeout.
+ *
+ * @param timeoutMs The timeout in miliseconds.
+ */
+ private void uninterruptiblyAcquire(long timeoutMs) throws Exception {
+ long startTimeMs = Time.monotonicNow();
+ while (true) {
+ long remTime = startTimeMs + timeoutMs - Time.monotonicNow();
+ if (remTime < 0) {
+ throw new RuntimeException("Failed to acquire the semaphore within "
+
+ timeoutMs + " milliseconds.");
+ }
+ try {
+ if (sem.tryAcquire(1, remTime, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {
+ gotInterruption.set(true);
+ }
+ }
+ }
+ }
+
+ private interface TestStopWorkerRunnable {
+ /**
+ * Return the name of the operation that this runnable performs.
+ */
+ String opName();
+
+ /**
+ * Perform the operation.
+ */
+ void run(RecoveringBlock recoveringBlock) throws Exception;
+ }
+
+ @Test(timeout=90000)
+ public void testInitReplicaRecoveryDoesNotHoldLock() throws Exception {
+ testStopWorker(new TestStopWorkerRunnable() {
+ @Override
+ public String opName() {
+ return "initReplicaRecovery";
+ }
+
+ @Override
+ public void run(RecoveringBlock recoveringBlock) throws Exception {
+ try {
+ spyDN.initReplicaRecovery(recoveringBlock);
+ } catch (Exception e) {
+ if (!e.getMessage().contains("meta does not exist")) {
+ throw e;
+ }
+ }
+ }
+ });
+ }
+
+ @Test(timeout=90000)
+ public void testRecoverAppendDoesNotHoldLock() throws Exception {
+ testStopWorker(new TestStopWorkerRunnable() {
+ @Override
+ public String opName() {
+ return "recoverAppend";
+ }
+
+ @Override
+ public void run(RecoveringBlock recoveringBlock) throws Exception {
+ try {
+ ExtendedBlock extBlock = recoveringBlock.getBlock();
+ spyDN.getFSDataset().recoverAppend(extBlock,
+ extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
+ } catch (Exception e) {
+ if (!e.getMessage().contains(
+ "Corrupted replica ReplicaBeingWritten")) {
+ throw e;
+ }
+ }
+ }
+ });
+ }
+
+ @Test(timeout=90000)
+ public void testRecoverCloseDoesNotHoldLock() throws Exception {
+ testStopWorker(new TestStopWorkerRunnable() {
+ @Override
+ public String opName() {
+ return "recoverClose";
+ }
+
+ @Override
+ public void run(RecoveringBlock recoveringBlock) throws Exception {
+ try {
+ ExtendedBlock extBlock = recoveringBlock.getBlock();
+ spyDN.getFSDataset().recoverClose(extBlock,
+ extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
+ } catch (Exception e) {
+ if (!e.getMessage().contains(
+ "Corrupted replica ReplicaBeingWritten")) {
+ throw e;
+ }
+ }
+ }
+ });
+ }
+
/**
- * Test that initReplicaRecovery does not hold the lock for an unreasonable
- * amount of time if a writer is taking a long time to stop.
+ * Test that an FsDatasetImpl operation does not hold the lock for an
+ * unreasonable amount of time if a writer is taking a long time to stop.
*/
- @Test(timeout=60000)
- public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Running " + GenericTestUtils.getMethodName());
- }
+ private void testStopWorker(final TestStopWorkerRunnable tswr)
+ throws Exception {
+ LOG.debug("Running " + currentTestName.getMethodName());
// We need a long value for the data xceiver stop timeout.
// Otherwise the timeout will trigger, and we will not have tested that
// thread join was done locklessly.
Assert.assertEquals(
- TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
+ TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS,
dn.getDnConf().getXceiverStopTimeout());
- final Semaphore progressParent = new Semaphore(0);
- final Semaphore terminateSlowWorker = new Semaphore(0);
- final AtomicBoolean failure = new AtomicBoolean(false);
+ final TestStopWorkerSemaphore progressParent =
+ new TestStopWorkerSemaphore();
+ final TestStopWorkerSemaphore terminateSlowWriter =
+ new TestStopWorkerSemaphore();
+ final AtomicReference<String> failure =
+ new AtomicReference<String>(null);
Collection<RecoveringBlock> recoveringBlocks =
initRecoveringBlocks();
final RecoveringBlock recoveringBlock =
Iterators.get(recoveringBlocks.iterator(), 0);
final ExtendedBlock block = recoveringBlock.getBlock();
- Thread slowWorker = new Thread(new Runnable() {
+ Thread slowWriterThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// Register this thread as the writer for the recoveringBlock.
- LOG.debug("slowWorker creating rbw");
+ LOG.debug("slowWriter creating rbw");
ReplicaHandler replicaHandler =
spyDN.data.createRbw(StorageType.DISK, block, false);
replicaHandler.close();
- LOG.debug("slowWorker created rbw");
+ LOG.debug("slowWriter created rbw");
// Tell the parent thread to start progressing.
- progressParent.release();
- while (true) {
- try {
- terminateSlowWorker.acquire();
- break;
- } catch (InterruptedException e) {
- // Ignore interrupted exceptions so that the waitingWorker thread
- // will have to wait for us.
- }
- }
- LOG.debug("slowWorker exiting");
+ progressParent.sem.release();
+ terminateSlowWriter.uninterruptiblyAcquire(60000);
+ LOG.debug("slowWriter exiting");
} catch (Throwable t) {
- LOG.error("slowWorker got exception", t);
- failure.set(true);
+ LOG.error("slowWriter got exception", t);
+ failure.compareAndSet(null, "slowWriter got exception " +
+ t.getMessage());
}
}
});
// Start the slow worker thread and wait for it to take ownership of the
// ReplicaInPipeline
- slowWorker.start();
- while (true) {
- try {
- progressParent.acquire();
- break;
- } catch (InterruptedException e) {
- // Ignore interrupted exceptions
- }
- }
+ slowWriterThread.start();
+ progressParent.uninterruptiblyAcquire(60000);
- // Start a worker thread which will wait for the slow worker thread.
- Thread waitingWorker = new Thread(new Runnable() {
+ // Start a worker thread which will attempt to stop the writer.
+ Thread stopWriterThread = new Thread(new Runnable() {
@Override
public void run() {
try {
- // Attempt to terminate the other worker thread and take ownership
- // of the ReplicaInPipeline.
- LOG.debug("waitingWorker initiating recovery");
- spyDN.initReplicaRecovery(recoveringBlock);
- LOG.debug("waitingWorker initiated recovery");
+ LOG.debug("initiating " + tswr.opName());
+ tswr.run(recoveringBlock);
+ LOG.debug("finished " + tswr.opName());
} catch (Throwable t) {
- GenericTestUtils.assertExceptionContains("meta does not exist", t);
+ LOG.error("stopWriterThread got unexpected exception for " +
+ tswr.opName(), t);
+ failure.compareAndSet(null, "stopWriterThread got unexpected " +
+ "exception for " + tswr.opName() + ": " + t.getMessage());
}
}
});
- waitingWorker.start();
+ stopWriterThread.start();
- // Do an operation that requires the lock. This should not be blocked
- // by the replica recovery in progress.
+ while (!terminateSlowWriter.gotInterruption.get()) {
+ // Wait until stopWriterThread attempts to stop our slow writer by
sending
+ // it an InterruptedException.
+ Thread.sleep(1);
+ }
+
+ // We know that stopWriterThread is in the process of joining our slow
+ // writer. It must not hold the lock during this operation.
+ // In order to test that it does not, we attempt to do an operation that
+ // requires the lock-- getReplicaString.
spyDN.getFSDataset().getReplicaString(
recoveringBlock.getBlock().getBlockPoolId(),
recoveringBlock.getBlock().getBlockId());
- // Wait for the two worker threads to exit normally.
- terminateSlowWorker.release();
- slowWorker.join();
- waitingWorker.join();
- Assert.assertFalse("The slowWriter thread failed.", failure.get());
+ // Tell the slow writer to exit, and then wait for all threads to join.
+ terminateSlowWriter.sem.release();
+ slowWriterThread.join();
+ stopWriterThread.join();
+
+ // Check that our worker threads exited cleanly. This is not checked by
the
+ // unit test framework, so we have to do it manually here.
+ String failureReason = failure.get();
+ if (failureReason != null) {
+ Assert.fail("Thread failure: " + failureReason);
+ }
}
}