This is an automated email from the ASF dual-hosted git repository.
kihwal pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 232e9f8 HDFS-15147. LazyPersistTestCase wait logic is error-prone.
Contributed by Ahmed Hussein.
232e9f8 is described below
commit 232e9f8ee117a29a2f8f1360bebd8d0e8def826a
Author: Kihwal Lee <[email protected]>
AuthorDate: Thu Feb 27 09:58:44 2020 -0600
HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed
by Ahmed Hussein.
---
.../hdfs/server/blockmanagement/BlockManager.java | 11 +-
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 18 +-
.../java/org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +-
.../fsdataset/impl/LazyPersistTestCase.java | 234 ++++++++++++++++++---
.../fsdataset/impl/TestLazyPersistFiles.java | 77 +++----
.../impl/TestLazyPersistReplicaPlacement.java | 2 +-
.../datanode/fsdataset/impl/TestLazyWriter.java | 6 +-
7 files changed, 260 insertions(+), 90 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index fd8739e..5addf5a 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -46,9 +46,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.management.ObjectName;
-
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -233,6 +231,8 @@ public class BlockManager implements BlockStatsMXBean {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+ /** Timestamp for the last cycle of the redundancy thread. */
+ private final AtomicLong lastReplicationCycleTS = new AtomicLong(-1);
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread =
@@ -3986,11 +3986,15 @@ public class BlockManager implements BlockStatsMXBean {
return neededReplications.size();
}
+ @VisibleForTesting
+ public long getLastReplicationCycleTS() {
+ return lastReplicationCycleTS.get();
+ }
+
/**
* Periodically calls computeReplicationWork().
*/
private class ReplicationMonitor implements Runnable {
-
@Override
public void run() {
while (namesystem.isRunning()) {
@@ -4000,6 +4004,7 @@ public class BlockManager implements BlockStatsMXBean {
computeDatanodeWork();
processPendingReplications();
rescanPostponedMisreplicatedBlocks();
+ lastReplicationCycleTS.set(Time.monotonicNow());
}
Thread.sleep(replicationRecheckInterval);
} catch (Throwable t) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index f0af5b4..11ac3fc 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -87,9 +87,12 @@ import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
-import static
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
+
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.OBSERVER;
@@ -294,6 +297,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
@@ -450,6 +454,8 @@ public class FSNamesystem implements Namesystem,
FSNamesystemMBean,
// A daemon to periodically clean up corrupt lazyPersist files
// from the name space.
Daemon lazyPersistFileScrubber = null;
+ private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0);
+
// Executor to warm up EDEK cache
private ExecutorService edekCacheLoader = null;
@@ -603,6 +609,12 @@ public class FSNamesystem implements Namesystem,
FSNamesystemMBean,
return leaseManager;
}
+ @VisibleForTesting
+ public long getLazyPersistFileScrubberTS() {
+ return lazyPersistFileScrubber == null ? -1
+ : lazyPersistFileScrubberTS.get();
+ }
+
public boolean isHaEnabled() {
return haEnabled;
}
@@ -3943,6 +3955,8 @@ public class FSNamesystem implements Namesystem,
FSNamesystemMBean,
try {
if (!isInSafeMode()) {
clearCorruptLazyPersistFiles();
+ // set the timeStamp of last Cycle.
+ lazyPersistFileScrubberTS.set(Time.monotonicNow());
} else {
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG
@@ -3953,7 +3967,6 @@ public class FSNamesystem implements Namesystem,
FSNamesystemMBean,
FSNamesystem.LOG.error(
"Ignoring exception in LazyPersistFileScrubber:", e);
}
-
try {
Thread.sleep(scrubIntervalSec * 1000);
} catch (InterruptedException e) {
@@ -7183,4 +7196,3 @@ public class FSNamesystem implements Namesystem,
FSNamesystemMBean,
}
}
-
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index f81e90e3..bee3314 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1909,7 +1909,7 @@ public class DFSTestUtil {
throw new UnhandledException("Test failed due to unexpected
exception", e);
}
}
- }, 1000, 60000);
+ }, 50, 60000);
}
public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 13ea940..ef34222 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
@@ -36,15 +37,27 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -81,16 +94,28 @@ public abstract class LazyPersistTestCase {
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
}
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(LazyPersistTestCase.class);
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096;
- private static final long HEARTBEAT_INTERVAL_SEC = 1;
- private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
- private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
- private static final String JMX_SERVICE_NAME = "DataNode";
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
- protected static final Log LOG =
LogFactory.getLog(LazyPersistTestCase.class);
protected static final short REPL_FACTOR = 1;
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+ private static final String JMX_SERVICE_NAME = "DataNode";
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+ private static final int WAIT_FOR_FBR_MS = 10 * 1000;
+ private static final int WAIT_FOR_STORAGE_TYPES_MS = 30 * 1000;
+ private static final int WAIT_FOR_ASYNC_DELETE_MS = 10 * 1000;
+ private static final int WAIT_FOR_DN_SHUTDOWN_MS = 30 * 1000;
+ private static final int WAIT_FOR_REDUNDANCY_MS =
+ 2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000;
+ private static final int WAIT_FOR_LAZY_SCRUBBER_MS =
+ 2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000;
+ private static final int WAIT_POLL_INTERVAL_MS = 10;
+ private static final int WAIT_POLL_INTERVAL_LARGE_MS = 20;
+
protected final long osPageSize =
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
@@ -154,7 +179,7 @@ public abstract class LazyPersistTestCase {
return false;
}
}
- }, 100, 30 * 1000);
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
return client.getLocatedBlocks(path.toString(), 0, fileLength);
}
@@ -429,11 +454,38 @@ public abstract class LazyPersistTestCase {
private boolean disableScrubber=false;
}
+ /**
+ * Force a FBR on all the datanodes.
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
protected final void triggerBlockReport()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
// Trigger block report to NN
- DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
- Thread.sleep(10 * 1000);
+ final Map<DatanodeStorageInfo, Integer> reportCounts = new HashMap<>();
+ final FSNamesystem fsn = cluster.getNamesystem();
+ for (DataNode dn : cluster.getDataNodes()) {
+ final DatanodeDescriptor dnd =
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+ final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+ reportCounts.put(storage, storage.getBlockReportCount());
+ DataNodeTestUtils.triggerBlockReport(dn);
+ }
+ // wait for block reports to be received.
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ for (Entry<DatanodeStorageInfo, Integer> repCntEntry : reportCounts
+ .entrySet()) {
+ if (repCntEntry.getValue() == repCntEntry.getKey()
+ .getBlockReportCount()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
}
protected final boolean verifyBlockDeletedFromDir(File dir,
@@ -445,51 +497,65 @@ public abstract class LazyPersistTestCase {
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
- LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
- " exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
- LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
- " exists after deletion.");
return false;
}
}
return true;
}
- protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
- throws IOException, InterruptedException {
+ protected final boolean verifyDeletedBlocks(final LocatedBlocks
locatedBlocks)
+ throws Exception {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
+ final DataNode dn = cluster.getDataNodes().get(0);
- while(
- cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
- > 0L){
- Thread.sleep(1000);
- }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ for (DataNode dn1 : cluster.getDataNodes()) {
+ if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions()
+ > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
final String bpid = cluster.getNamesystem().getBlockPoolId();
- final FsDatasetSpi<?> dataset =
- cluster.getDataNodes().get(0).getFSDataset();
-
+ final FsDatasetSpi<?> dataset = dn.getFSDataset();
// Make sure deleted replica does not have a copy on either finalized dir
of
- // transient volume or finalized dir of non-transient volume
+ // transient volume or finalized dir of non-transient volume.
+ // We need to wait until the asyn deletion is scheduled.
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
- for (FsVolumeSpi vol : volumes) {
- FsVolumeImpl volume = (FsVolumeImpl) vol;
- File targetDir = (volume.isTransientStorage()) ?
- volume.getBlockPoolSlice(bpid).getFinalizedDir() :
- volume.getBlockPoolSlice(bpid).getLazypersistDir();
- if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
- return false;
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ for (FsVolumeSpi vol : volumes) {
+ FsVolumeImpl volume = (FsVolumeImpl) vol;
+ File targetDir = (volume.isTransientStorage()) ?
+ volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
+ if (!LazyPersistTestCase.this
+ .verifyBlockDeletedFromDir(targetDir, locatedBlocks)) {
+ return false;
+ }
+ }
+ return true;
+ } catch (IOException ie) {
+ return false;
+ }
}
- }
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
}
return true;
}
@@ -534,4 +600,104 @@ public abstract class LazyPersistTestCase {
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction
cycle.
}
+
+ /**
+ * the DataNodes and sleep for the time it takes the NN to detect the DN as
+ * being dead.
+ */
+ protected void shutdownDataNodes()
+ throws TimeoutException, InterruptedException {
+ cluster.shutdownDataNodes();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ DatanodeInfo[] info = client.datanodeReport(
+ HdfsConstants.DatanodeReportType.LIVE);
+ return info.length == 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
+ }
+
+ protected void waitForCorruptBlock(final long corruptCnt)
+ throws TimeoutException, InterruptedException {
+ // wait for the redundancy monitor to mark the file as corrupt.
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
+
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
+ int count = 0;
+ while (bInfoIter.hasNext()) {
+ bInfoIter.next();
+ count++;
+ }
+ return corruptCnt == count;
+ }
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+ }
+
+ protected void waitForScrubberCycle()
+ throws TimeoutException, InterruptedException {
+ // wait for the redundancy monitor to mark the file as corrupt.
+ final FSNamesystem fsn = cluster.getNamesystem();
+ final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
+ if (lastTimeStamp == -1) { // scrubber is disabled
+ return;
+ }
+ GenericTestUtils.waitFor(
+ new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return lastTimeStamp != fsn.getLazyPersistFileScrubberTS();
+ }
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
+ }
+
+ protected void waitForRedundancyMonitorCycle()
+ throws TimeoutException, InterruptedException {
+ // wait for the redundancy monitor to mark the file as corrupt.
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+ final long lastRedundancyTS =
+ bm.getLastReplicationCycleTS();
+
+ GenericTestUtils.waitFor(
+ new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return lastRedundancyTS != bm.getLastReplicationCycleTS();
+ }
+ },
+ 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+ }
+
+ protected void waitForRedundancyCount(final long cnt)
+ throws TimeoutException, InterruptedException {
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+
+ GenericTestUtils.waitFor(
+ new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return cnt == bm.getUnderReplicatedBlocksCount();
+ }
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+ }
+
+ protected void waitForFile(final Path path, final boolean expected)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ return expected == fs.exists(path);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 8c43592..b177a7b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -16,8 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
@@ -33,7 +31,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -98,28 +95,20 @@ public class TestLazyPersistFiles extends
LazyPersistTestCase {
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- // Stop the DataNode and sleep for the time it takes the NN to
- // detect the DN as being dead.
- cluster.shutdownDataNodes();
- Thread.sleep(30000L);
+ // Stop the DataNode.
+ shutdownDataNodes();
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
- // Next, wait for the replication monitor to mark the file as corrupt
- Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
-
+ // Next, wait for the redundancy monitor to mark the file as corrupt.
+ waitForRedundancyMonitorCycle();
// Wait for the LazyPersistFileScrubber to run
- Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+ waitForScrubberCycle();
// Ensure that path1 does not exist anymore, whereas path2 does.
- assert(!fs.exists(path1));
+ waitForFile(path1, false);
// We should have zero blocks that needs replication i.e. the one
- // belonging to path2.
- assertThat(cluster.getNameNode()
- .getNamesystem()
- .getBlockManager()
- .getUnderReplicatedBlocksCount(),
- is(0L));
+ // belonging to path2. This needs a wait.
+ waitForRedundancyCount(0L);
}
@Test
@@ -134,18 +123,14 @@ public class TestLazyPersistFiles extends
LazyPersistTestCase {
// Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead.
- cluster.shutdownDataNodes();
- Thread.sleep(30000L);
-
- // Next, wait for the replication monitor to mark the file as corrupt
- Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
+ shutdownDataNodes();
+ // wait for the redundancy monitor to mark the file as corrupt.
+ waitForCorruptBlock(1L);
// Wait for the LazyPersistFileScrubber to run
- Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+ waitForScrubberCycle();
// Ensure that path1 exist.
- Assert.assertTrue(fs.exists(path1));
-
+ waitForFile(path1, true);
}
/**
@@ -160,21 +145,14 @@ public class TestLazyPersistFiles extends
LazyPersistTestCase {
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- cluster.shutdownDataNodes();
+ shutdownDataNodes();
cluster.restartNameNodes();
- // wait for the replication monitor to mark the file as corrupt
- Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
-
- Long corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
- .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
-
- // Check block detected as corrupted
- assertThat(corruptBlkCount, is(1L));
-
+ // wait for the redundancy monitor to mark the file as corrupt.
+ waitForCorruptBlock(1L);
// Ensure path1 exist.
- Assert.assertTrue(fs.exists(path1));
+ waitForFile(path1, true);
}
/**
@@ -216,10 +194,19 @@ public class TestLazyPersistFiles extends
LazyPersistTestCase {
threads[i].start();
}
- Thread.sleep(500);
-
for (int i = 0; i < NUM_TASKS; i++) {
- Uninterruptibles.joinUninterruptibly(threads[i]);
+ boolean interrupted = false;
+ while (true) {
+ try {
+ threads[i].join();
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
Assert.assertFalse(testFailed.get());
}
@@ -233,7 +220,7 @@ public class TestLazyPersistFiles extends
LazyPersistTestCase {
*/
@Test
public void testConcurrentWrites()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(9).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
@@ -282,11 +269,11 @@ public class TestLazyPersistFiles extends
LazyPersistTestCase {
this.seed = seed;
this.latch = latch;
this.bFail = bFail;
- System.out.println("Creating Writer: " + id);
+ LOG.info("Creating Writer: " + id);
}
public void run() {
- System.out.println("Writer " + id + " starting... ");
+ LOG.info("Writer " + id + " starting... ");
int i = 0;
try {
for (i = 0; i < paths.length; i++) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index c16dbe5..b6413ec 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends
LazyPersistTestCase {
*/
@Test
public void testFallbackToDiskPartial()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index 1680764..56cc41e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase {
for (int i = 0; i < NUM_PATHS; ++i) {
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
triggerBlockReport();
- Thread.sleep(3000);
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
for (int j = i + 1; j < NUM_PATHS; ++j) {
@@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
throws Exception {
getClusterBuilder().build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
- FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+ final DataNode dn = cluster.getDataNodes().get(0);
+ FsDatasetTestUtil.stopLazyWriter(dn);
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks =
ensureFileReplicasOnStorageType(path, RAM_DISK);
-
// Delete before persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]