Repository: hadoop Updated Branches: refs/heads/trunk 0058eadbd -> 463aec117
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java new file mode 100644 index 0000000..c762849 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.tools.JMXGet; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Rule; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.StorageType.DEFAULT; +import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public abstract class LazyPersistTestCase { + + static { + ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); + } + + protected static final int BLOCK_SIZE = 5 * 1024 * 1024; + protected static final int BUFFER_LENGTH = 4096; + protected static final int EVICTION_LOW_WATERMARK = 1; + private static final long HEARTBEAT_INTERVAL_SEC = 1; + private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; + private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; + private static final String JMX_SERVICE_NAME = "DataNode"; + protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; + protected static final int LAZY_WRITER_INTERVAL_SEC = 1; + protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class); + protected static final short REPL_FACTOR = 1; + + protected MiniDFSCluster cluster; + protected DistributedFileSystem fs; + protected DFSClient client; + protected JMXGet jmx; + protected TemporarySocketDirectory sockDir; + + @After + public void shutDownCluster() throws Exception { + + // Dump all RamDisk JMX metrics before shutdown the cluster + printRamDiskJMXMetrics(); + + if (fs != null) { + fs.close(); + fs = null; + client = null; + } + + if (cluster != null) { + cluster.shutdownDataNodes(); + cluster.shutdown(); + cluster = null; + } + + if (jmx != null) { + jmx = null; + } + + IOUtils.closeQuietly(sockDir); + sockDir = null; + } + + @Rule + public Timeout timeout = new Timeout(300000); + + protected final LocatedBlocks ensureFileReplicasOnStorageType( + Path path, StorageType storageType) throws IOException { + // Ensure that returned block locations returned are correct! + LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); + assertThat(fs.exists(path), is(true)); + long fileLength = client.getFileInfo(path.toString()).getLen(); + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + assertThat(locatedBlock.getStorageTypes()[0], is(storageType)); + } + return locatedBlocks; + } + + protected final void makeRandomTestFile(Path path, long length, + boolean isLazyPersist, long seed) throws IOException { + DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, + BLOCK_SIZE, REPL_FACTOR, seed, true); + } + + protected final void makeTestFile(Path path, long length, + boolean isLazyPersist) throws IOException { + + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + + if (isLazyPersist) { + createFlags.add(LAZY_PERSIST); + } + + FSDataOutputStream fos = null; + try { + fos = + fs.create(path, + FsPermission.getFileDefault(), + createFlags, + BUFFER_LENGTH, + REPL_FACTOR, + BLOCK_SIZE, + null); + + // Allocate a block. + byte[] buffer = new byte[BUFFER_LENGTH]; + for (int bytesWritten = 0; bytesWritten < length; ) { + fos.write(buffer, 0, buffer.length); + bytesWritten += buffer.length; + } + if (length > 0) { + fos.hsync(); + } + } finally { + IOUtils.closeQuietly(fos); + } + } + + /** + * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially + * capped. If ramDiskStorageLimit < 0 then it is ignored. + */ + protected final void startUpCluster(boolean hasTransientStorage, + final int ramDiskReplicaCapacity, + final boolean useSCR, + final boolean useLegacyBlockReaderLocal) + throws IOException { + + Configuration conf = new Configuration(); + conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, + LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); + conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + HEARTBEAT_RECHECK_INTERVAL_MSEC); + conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, + LAZY_WRITER_INTERVAL_SEC); + conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, + EVICTION_LOW_WATERMARK * BLOCK_SIZE); + + if (useSCR) { + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + // Do not share a client context across tests. + conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString()); + if (useLegacyBlockReaderLocal) { + conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); + conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + } else { + sockDir = new TemporarySocketDirectory(); + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), + this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath()); + } + } + + long[] capacities = null; + if (hasTransientStorage && ramDiskReplicaCapacity >= 0) { + // Convert replica count to byte count, add some delta for .meta and + // VERSION files. + long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + + (BLOCK_SIZE - 1); + capacities = new long[] { ramDiskStorageLimit, -1 }; + } + + cluster = new MiniDFSCluster + .Builder(conf) + .numDataNodes(REPL_FACTOR) + .storageCapacities(capacities) + .storageTypes(hasTransientStorage ? + new StorageType[]{ RAM_DISK, DEFAULT } : null) + .build(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + try { + jmx = initJMX(); + } catch (Exception e) { + fail("Failed initialize JMX for testing: " + e); + } + LOG.info("Cluster startup complete"); + } + + /** + * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially + * capped. If ramDiskStorageLimit < 0 then it is ignored. + */ + protected final void startUpCluster(final int numDataNodes, + final StorageType[] storageTypes, + final long ramDiskStorageLimit, + final boolean useSCR) + throws IOException { + + Configuration conf = new Configuration(); + conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, + LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); + conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + HEARTBEAT_RECHECK_INTERVAL_MSEC); + conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, + LAZY_WRITER_INTERVAL_SEC); + + if (useSCR) + { + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR); + conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString()); + sockDir = new TemporarySocketDirectory(); + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), + this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath()); + conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + } + + cluster = new MiniDFSCluster + .Builder(conf) + .numDataNodes(numDataNodes) + .storageTypes(storageTypes != null ? + storageTypes : new StorageType[] { DEFAULT, DEFAULT }) + .build(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + + // Artificially cap the storage capacity of the RAM_DISK volume. + if (ramDiskStorageLimit >= 0) { + List<? extends FsVolumeSpi> volumes = + cluster.getDataNodes().get(0).getFSDataset().getVolumes(); + + for (FsVolumeSpi volume : volumes) { + if (volume.getStorageType() == RAM_DISK) { + ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit); + } + } + } + + LOG.info("Cluster startup complete"); + } + + protected final void startUpCluster(boolean hasTransientStorage, + final int ramDiskReplicaCapacity) + throws IOException { + startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false); + } + + protected final void triggerBlockReport() + throws IOException, InterruptedException { + // Trigger block report to NN + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + Thread.sleep(10 * 1000); + } + + protected final boolean verifyBlockDeletedFromDir(File dir, + LocatedBlocks locatedBlocks) { + + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + File targetDir = + DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId()); + + File blockFile = new File(targetDir, lb.getBlock().getBlockName()); + if (blockFile.exists()) { + LOG.warn("blockFile: " + blockFile.getAbsolutePath() + + " exists after deletion."); + return false; + } + File metaFile = new File(targetDir, + DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), + lb.getBlock().getGenerationStamp())); + if (metaFile.exists()) { + LOG.warn("metaFile: " + metaFile.getAbsolutePath() + + " exists after deletion."); + return false; + } + } + return true; + } + + protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) + throws IOException, InterruptedException { + + LOG.info("Verifying replica has no saved copy after deletion."); + triggerBlockReport(); + + while( + DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0)) + > 0L){ + Thread.sleep(1000); + } + + final String bpid = cluster.getNamesystem().getBlockPoolId(); + List<? extends FsVolumeSpi> volumes = + cluster.getDataNodes().get(0).getFSDataset().getVolumes(); + + // Make sure deleted replica does not have a copy on either finalized dir of + // transient volume or finalized dir of non-transient volume + for (FsVolumeSpi v : volumes) { + FsVolumeImpl volume = (FsVolumeImpl) v; + File targetDir = (v.isTransientStorage()) ? + volume.getBlockPoolSlice(bpid).getFinalizedDir() : + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { + return false; + } + } + return true; + } + + protected final void verifyRamDiskJMXMetric(String metricName, + long expectedValue) throws Exception { + assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName))); + } + + protected final boolean verifyReadRandomFile( + Path path, int fileLength, int seed) throws IOException { + byte contents[] = DFSTestUtil.readFileBuffer(fs, path); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(seed, fileLength); + return Arrays.equals(contents, expected); + } + + private JMXGet initJMX() throws Exception { + JMXGet jmx = new JMXGet(); + jmx.setService(JMX_SERVICE_NAME); + jmx.init(); + return jmx; + } + + private void printRamDiskJMXMetrics() { + try { + if (jmx != null) { + jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 444afed..771609c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -17,103 +17,45 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.tools.JMXGet; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.After; import org.junit.Assert; import org.junit.Test; -import java.io.*; -import java.util.*; +import java.io.File; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; -import static org.apache.hadoop.fs.CreateFlag.CREATE; -import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class TestLazyPersistFiles { - public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class); - - static { - ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); - } - +public class TestLazyPersistFiles extends LazyPersistTestCase { private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; private static final int THREADPOOL_SIZE = 10; - private static final short REPL_FACTOR = 1; - private static final int BLOCK_SIZE = 5 * 1024 * 1024; - private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; - private static final long HEARTBEAT_INTERVAL_SEC = 1; - private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; - private static final int LAZY_WRITER_INTERVAL_SEC = 1; - private static final int BUFFER_LENGTH = 4096; - private static final int EVICTION_LOW_WATERMARK = 1; - private static final String JMX_SERVICE_NAME = "DataNode"; - private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; - - private MiniDFSCluster cluster; - private DistributedFileSystem fs; - private DFSClient client; - private Configuration conf; - private JMXGet jmx; - - @After - public void shutDownCluster() throws Exception { - - // Dump all RamDisk JMX metrics before shutdown the cluster - printRamDiskJMXMetrics(); - - if (fs != null) { - fs.close(); - fs = null; - client = null; - } - - if (cluster != null) { - cluster.shutdownDataNodes(); - cluster.shutdown(); - cluster = null; - } - - if (jmx != null) { - jmx = null; - } - } - - @Test (timeout=300000) + @Test public void testPolicyNotSetByDefault() throws IOException { startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -126,7 +68,7 @@ public class TestLazyPersistFiles { assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID)); } - @Test (timeout=300000) + @Test public void testPolicyPropagation() throws IOException { startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -138,7 +80,7 @@ public class TestLazyPersistFiles { assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID)); } - @Test (timeout=300000) + @Test public void testPolicyPersistenceInEditLog() throws IOException { startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -152,7 +94,7 @@ public class TestLazyPersistFiles { assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID)); } - @Test (timeout=300000) + @Test public void testPolicyPersistenceInFsImage() throws IOException { startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -170,7 +112,7 @@ public class TestLazyPersistFiles { assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID)); } - @Test (timeout=300000) + @Test public void testPlacementOnRamDisk() throws IOException { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -180,7 +122,7 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path, RAM_DISK); } - @Test (timeout=300000) + @Test public void testPlacementOnSizeLimitedRamDisk() throws IOException { startUpCluster(true, 3); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -199,7 +141,7 @@ public class TestLazyPersistFiles { * Write should default to disk. No error. * @throws IOException */ - @Test (timeout=300000) + @Test public void testFallbackToDisk() throws IOException { startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -213,7 +155,7 @@ public class TestLazyPersistFiles { * File can not fit in RamDisk even with eviction * @throws IOException */ - @Test (timeout=300000) + @Test public void testFallbackToDiskFull() throws Exception { startUpCluster(false, 0); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -231,7 +173,7 @@ public class TestLazyPersistFiles { * Expect 2 or less blocks are on RamDisk and 3 or more on disk. * @throws IOException */ - @Test (timeout=300000) + @Test public void testFallbackToDiskPartial() throws IOException, InterruptedException { startUpCluster(true, 2); @@ -271,7 +213,7 @@ public class TestLazyPersistFiles { * * @throws IOException */ - @Test (timeout=300000) + @Test public void testRamDiskNotChosenByDefault() throws IOException { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -289,7 +231,7 @@ public class TestLazyPersistFiles { * Append to lazy persist file is denied. * @throws IOException */ - @Test (timeout=300000) + @Test public void testAppendIsDenied() throws IOException { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -310,7 +252,7 @@ public class TestLazyPersistFiles { * must be discarded by the NN, instead of being kept around as a * 'corrupt' file. */ - @Test (timeout=300000) + @Test public void testLazyPersistFilesAreDiscarded() throws IOException, InterruptedException { startUpCluster(true, 2); @@ -344,7 +286,7 @@ public class TestLazyPersistFiles { is(0L)); } - @Test (timeout=300000) + @Test public void testLazyPersistBlocksAreSaved() throws IOException, InterruptedException { startUpCluster(true, -1); @@ -399,7 +341,7 @@ public class TestLazyPersistFiles { * RamDisk eviction after lazy persist to disk. * @throws Exception */ - @Test (timeout=300000) + @Test public void testRamDiskEviction() throws Exception { startUpCluster(true, 1 + EVICTION_LOW_WATERMARK); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -434,7 +376,7 @@ public class TestLazyPersistFiles { * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000) + @Test public void testRamDiskEvictionBeforePersist() throws IOException, InterruptedException { startUpCluster(true, 1); @@ -459,7 +401,7 @@ public class TestLazyPersistFiles { assert(fs.exists(path1)); assert(fs.exists(path2)); - verifyReadRandomFile(path1, BLOCK_SIZE, SEED); + assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); } /** @@ -467,7 +409,7 @@ public class TestLazyPersistFiles { * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000) + @Test public void testRamDiskEvictionIsLru() throws Exception { final int NUM_PATHS = 5; @@ -529,7 +471,7 @@ public class TestLazyPersistFiles { * Memory is freed up and file is gone. * @throws IOException */ - @Test // (timeout=300000) + @Test public void testDeleteBeforePersist() throws Exception { startUpCluster(true, -1); @@ -556,7 +498,7 @@ public class TestLazyPersistFiles { * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000) + @Test public void testDeleteAfterPersist() throws Exception { startUpCluster(true, -1); @@ -584,7 +526,7 @@ public class TestLazyPersistFiles { * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000) + @Test public void testDfsUsageCreateDelete() throws IOException, InterruptedException { startUpCluster(true, 4); @@ -615,7 +557,7 @@ public class TestLazyPersistFiles { /** * Concurrent read from the same node and verify the contents. */ - @Test (timeout=300000) + @Test public void testConcurrentRead() throws Exception { startUpCluster(true, 2); @@ -666,7 +608,7 @@ public class TestLazyPersistFiles { * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000) + @Test public void testConcurrentWrites() throws IOException, InterruptedException { startUpCluster(true, 9); @@ -702,7 +644,7 @@ public class TestLazyPersistFiles { assertThat(testFailed.get(), is(false)); } - @Test (timeout=300000) + @Test public void testDnRestartWithSavedReplicas() throws IOException, InterruptedException { @@ -726,7 +668,7 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path1, DEFAULT); } - @Test (timeout=300000) + @Test public void testDnRestartWithUnsavedReplicas() throws IOException, InterruptedException { @@ -746,183 +688,6 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path1, RAM_DISK); } - // ---- Utility functions for all test cases ------------------------------- - - /** - * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially - * capped. If ramDiskStorageLimit < 0 then it is ignored. - */ - private void startUpCluster(boolean hasTransientStorage, - final int ramDiskReplicaCapacity, - final boolean useSCR) - throws IOException { - - conf = new Configuration(); - conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, - LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); - conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); - conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - HEARTBEAT_RECHECK_INTERVAL_MSEC); - conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, - LAZY_WRITER_INTERVAL_SEC); - conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, - EVICTION_LOW_WATERMARK * BLOCK_SIZE); - - conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR); - - long[] capacities = null; - if (hasTransientStorage && ramDiskReplicaCapacity >= 0) { - // Convert replica count to byte count, add some delta for .meta and VERSION files. - long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1); - capacities = new long[] { ramDiskStorageLimit, -1 }; - } - - cluster = new MiniDFSCluster - .Builder(conf) - .numDataNodes(REPL_FACTOR) - .storageCapacities(capacities) - .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null) - .build(); - fs = cluster.getFileSystem(); - client = fs.getClient(); - try { - jmx = initJMX(); - } catch (Exception e) { - fail("Failed initialize JMX for testing: " + e); - } - LOG.info("Cluster startup complete"); - } - - private void startUpCluster(boolean hasTransientStorage, - final int ramDiskReplicaCapacity) - throws IOException { - startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false); - } - - private void makeTestFile(Path path, long length, final boolean isLazyPersist) - throws IOException { - - EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); - - if (isLazyPersist) { - createFlags.add(LAZY_PERSIST); - } - - FSDataOutputStream fos = null; - try { - fos = - fs.create(path, - FsPermission.getFileDefault(), - createFlags, - BUFFER_LENGTH, - REPL_FACTOR, - BLOCK_SIZE, - null); - - // Allocate a block. - byte[] buffer = new byte[BUFFER_LENGTH]; - for (int bytesWritten = 0; bytesWritten < length; ) { - fos.write(buffer, 0, buffer.length); - bytesWritten += buffer.length; - } - if (length > 0) { - fos.hsync(); - } - } finally { - IOUtils.closeQuietly(fos); - } - } - - private LocatedBlocks ensureFileReplicasOnStorageType( - Path path, StorageType storageType) throws IOException { - // Ensure that returned block locations returned are correct! - LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); - assertThat(fs.exists(path), is(true)); - long fileLength = client.getFileInfo(path.toString()).getLen(); - LocatedBlocks locatedBlocks = - client.getLocatedBlocks(path.toString(), 0, fileLength); - for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { - assertThat(locatedBlock.getStorageTypes()[0], is(storageType)); - } - return locatedBlocks; - } - - private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist, - long seed) throws IOException { - DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, - BLOCK_SIZE, REPL_FACTOR, seed, true); - } - - private boolean verifyReadRandomFile( - Path path, int fileLength, int seed) throws IOException { - byte contents[] = DFSTestUtil.readFileBuffer(fs, path); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(seed, fileLength); - return Arrays.equals(contents, expected); - } - - private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) - throws IOException, InterruptedException { - - LOG.info("Verifying replica has no saved copy after deletion."); - triggerBlockReport(); - - while( - DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0)) - > 0L){ - Thread.sleep(1000); - } - - final String bpid = cluster.getNamesystem().getBlockPoolId(); - List<? extends FsVolumeSpi> volumes = - cluster.getDataNodes().get(0).getFSDataset().getVolumes(); - - // Make sure deleted replica does not have a copy on either finalized dir of - // transient volume or finalized dir of non-transient volume - for (FsVolumeSpi v : volumes) { - FsVolumeImpl volume = (FsVolumeImpl) v; - File targetDir = (v.isTransientStorage()) ? - volume.getBlockPoolSlice(bpid).getFinalizedDir() : - volume.getBlockPoolSlice(bpid).getLazypersistDir(); - if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { - return false; - } - } - return true; - } - - private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) { - - for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { - File targetDir = - DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId()); - - File blockFile = new File(targetDir, lb.getBlock().getBlockName()); - if (blockFile.exists()) { - LOG.warn("blockFile: " + blockFile.getAbsolutePath() + - " exists after deletion."); - return false; - } - File metaFile = new File(targetDir, - DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), - lb.getBlock().getGenerationStamp())); - if (metaFile.exists()) { - LOG.warn("metaFile: " + metaFile.getAbsolutePath() + - " exists after deletion."); - return false; - } - } - return true; - } - - private void triggerBlockReport() - throws IOException, InterruptedException { - // Trigger block report to NN - DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); - Thread.sleep(10 * 1000); - } - class WriterRunnable implements Runnable { private final int id; private final Path paths[]; @@ -960,27 +725,4 @@ public class TestLazyPersistFiles { } } } - - JMXGet initJMX() throws Exception - { - JMXGet jmx = new JMXGet(); - jmx.setService(JMX_SERVICE_NAME); - jmx.init(); - return jmx; - } - - void printRamDiskJMXMetrics() { - try { - if (jmx != null) { - jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - void verifyRamDiskJMXMetric(String metricName, long expectedValue) - throws Exception { - assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName))); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java index b6ac287..efc6dcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java @@ -15,84 +15,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; - import org.apache.commons.io.IOUtils; - import org.apache.commons.logging.Log; - import org.apache.commons.logging.LogFactory; - import org.apache.commons.logging.impl.Log4JLogger; - import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.fs.CreateFlag; - import org.apache.hadoop.fs.FSDataInputStream; - import org.apache.hadoop.fs.FSDataOutputStream; - import org.apache.hadoop.fs.Path; - import org.apache.hadoop.fs.permission.FsPermission; - import org.apache.hadoop.hdfs.*; - import org.apache.hadoop.hdfs.client.HdfsDataInputStream; - import org.apache.hadoop.hdfs.protocol.LocatedBlock; - import org.apache.hadoop.hdfs.protocol.LocatedBlocks; - import org.apache.hadoop.hdfs.server.datanode.DataNode; - import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; - import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; - import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; - import org.apache.hadoop.hdfs.server.namenode.NameNode; - import org.apache.hadoop.net.unix.DomainSocket; - import org.apache.hadoop.net.unix.TemporarySocketDirectory; - import org.apache.hadoop.security.UserGroupInformation; - import org.apache.hadoop.test.GenericTestUtils; - import org.apache.hadoop.util.NativeCodeLoader; - import org.apache.log4j.Level; - import org.junit.*; - - import java.io.File; - import java.io.IOException; - import java.util.Arrays; - import java.util.EnumSet; - import java.util.List; - import java.util.UUID; - - import static org.apache.hadoop.fs.CreateFlag.CREATE; - import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; - import static org.apache.hadoop.hdfs.DFSConfigKeys.*; - import static org.apache.hadoop.hdfs.StorageType.DEFAULT; - import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; - import static org.hamcrest.CoreMatchers.equalTo; - import static org.hamcrest.core.Is.is; - import static org.junit.Assert.assertThat; - -public class TestScrLazyPersistFiles { - public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class); - - static { - ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); - } - - private static short REPL_FACTOR = 1; - private static final int BLOCK_SIZE = 10485760; // 10 MB - private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; - private static final long HEARTBEAT_INTERVAL_SEC = 1; - private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; - private static final int LAZY_WRITER_INTERVAL_SEC = 1; - private static final int BUFFER_LENGTH = 4096; - private static TemporarySocketDirectory sockDir; - - private MiniDFSCluster cluster; - private DistributedFileSystem fs; - private DFSClient client; - private Configuration conf; +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.NativeCodeLoader; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; + +import static org.apache.hadoop.hdfs.StorageType.DEFAULT; +import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class TestScrLazyPersistFiles extends LazyPersistTestCase { @BeforeClass public static void init() { - sockDir = new TemporarySocketDirectory(); DomainSocket.disableBindPathValidation(); } - @AfterClass - public static void shutdown() throws IOException { - sockDir.close(); - } - @Before public void before() { Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS, @@ -100,26 +60,14 @@ public class TestScrLazyPersistFiles { Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); } - @After - public void shutDownCluster() throws IOException { - if (fs != null) { - fs.close(); - fs = null; - client = null; - } - - if (cluster != null) { - cluster.shutdownDataNodes(); - cluster.shutdown(); - cluster = null; - } - } + @Rule + public ExpectedException exception = ExpectedException.none(); /** * Read in-memory block with Short Circuit Read * Note: the test uses faked RAM_DISK from physical disk. */ - @Test (timeout=300000) + @Test public void testRamDiskShortCircuitRead() throws IOException, InterruptedException { startUpCluster(REPL_FACTOR, @@ -160,7 +108,7 @@ public class TestScrLazyPersistFiles { * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000000) + @Test public void testRamDiskEvictionWithShortCircuitReadHandle() throws IOException, InterruptedException { startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, @@ -204,123 +152,149 @@ public class TestScrLazyPersistFiles { ensureFileReplicasOnStorageType(path1, DEFAULT); } - // ---- Utility functions for all test cases ------------------------------- + @Test + public void testShortCircuitReadAfterEviction() + throws IOException, InterruptedException { + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false); + doShortCircuitReadAfterEvictionTest(); + } - /** - * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially - * capped. If ramDiskStorageLimit < 0 then it is ignored. - */ - private void startUpCluster(final int numDataNodes, - final StorageType[] storageTypes, - final long ramDiskStorageLimit, - final boolean useSCR) - throws IOException { - - conf = new Configuration(); - conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, - LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); - conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); - conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - HEARTBEAT_RECHECK_INTERVAL_MSEC); - conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, - LAZY_WRITER_INTERVAL_SEC); - - if (useSCR) - { - conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR); - conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, - UUID.randomUUID().toString()); - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(sockDir.getDir(), - "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath()); - conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, - UserGroupInformation.getCurrentUser().getShortUserName()); - } + @Test + public void testLegacyShortCircuitReadAfterEviction() + throws IOException, InterruptedException { + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true); + doShortCircuitReadAfterEvictionTest(); + } - REPL_FACTOR = 1; //Reset in case a test has modified the value - - cluster = new MiniDFSCluster - .Builder(conf) - .numDataNodes(numDataNodes) - .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT }) - .build(); - fs = cluster.getFileSystem(); - client = fs.getClient(); - - // Artificially cap the storage capacity of the RAM_DISK volume. - if (ramDiskStorageLimit >= 0) { - List<? extends FsVolumeSpi> volumes = - cluster.getDataNodes().get(0).getFSDataset().getVolumes(); - - for (FsVolumeSpi volume : volumes) { - if (volume.getStorageType() == RAM_DISK) { - ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit); - } - } + private void doShortCircuitReadAfterEvictionTest() throws IOException, + InterruptedException { + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + + final int SEED = 0xFADED; + makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + + // Verify short-circuit read from RAM_DISK. + ensureFileReplicasOnStorageType(path1, RAM_DISK); + File metaFile = MiniDFSCluster.getBlockMetadataFile(0, + DFSTestUtil.getFirstBlock(fs, path1)); + assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); + assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); + + // Sleep for a short time to allow the lazy writer thread to do its job. + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + + // Verify short-circuit read from RAM_DISK once again. + ensureFileReplicasOnStorageType(path1, RAM_DISK); + metaFile = MiniDFSCluster.getBlockMetadataFile(0, + DFSTestUtil.getFirstBlock(fs, path1)); + assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); + assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); + + // Create another file with a replica on RAM_DISK, which evicts the first. + makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + triggerBlockReport(); + + // Verify short-circuit read still works from DEFAULT storage. This time, + // we'll have a checksum written during lazy persistence. + ensureFileReplicasOnStorageType(path1, DEFAULT); + metaFile = MiniDFSCluster.getBlockMetadataFile(0, + DFSTestUtil.getFirstBlock(fs, path1)); + assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize()); + assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); + + // In the implementation of legacy short-circuit reads, any failure is + // trapped silently, reverts back to a remote read, and also disables all + // subsequent legacy short-circuit reads in the ClientContext. If the test + // uses legacy, then assert that it didn't get disabled. + ClientContext clientContext = client.getClientContext(); + if (clientContext.getUseLegacyBlockReaderLocal()) { + Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal()); } + } - LOG.info("Cluster startup complete"); + @Test + public void testShortCircuitReadBlockFileCorruption() throws IOException, + InterruptedException { + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false); + doShortCircuitReadBlockFileCorruptionTest(); } - private void makeTestFile(Path path, long length, final boolean isLazyPersist) - throws IOException { + @Test + public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException, + InterruptedException { + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true); + doShortCircuitReadBlockFileCorruptionTest(); + } - EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + public void doShortCircuitReadBlockFileCorruptionTest() throws IOException, + InterruptedException { + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - if (isLazyPersist) { - createFlags.add(LAZY_PERSIST); - } + final int SEED = 0xFADED; + makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + ensureFileReplicasOnStorageType(path1, RAM_DISK); - FSDataOutputStream fos = null; - try { - fos = - fs.create(path, - FsPermission.getFileDefault(), - createFlags, - BUFFER_LENGTH, - REPL_FACTOR, - BLOCK_SIZE, - null); - - // Allocate a block. - byte[] buffer = new byte[BUFFER_LENGTH]; - for (int bytesWritten = 0; bytesWritten < length; ) { - fos.write(buffer, 0, buffer.length); - bytesWritten += buffer.length; - } - if (length > 0) { - fos.hsync(); - } - } finally { - IOUtils.closeQuietly(fos); - } + // Create another file with a replica on RAM_DISK, which evicts the first. + makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); + + // Sleep for a short time to allow the lazy writer thread to do its job. + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + triggerBlockReport(); + + // Corrupt the lazy-persisted block file, and verify that checksum + // verification catches it. + ensureFileReplicasOnStorageType(path1, DEFAULT); + MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1)); + exception.expect(ChecksumException.class); + DFSTestUtil.readFileBuffer(fs, path1); } - private LocatedBlocks ensureFileReplicasOnStorageType( - Path path, StorageType storageType) throws IOException { - // Ensure that returned block locations returned are correct! - LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); - assertThat(fs.exists(path), is(true)); - long fileLength = client.getFileInfo(path.toString()).getLen(); - LocatedBlocks locatedBlocks = - client.getLocatedBlocks(path.toString(), 0, fileLength); - for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { - assertThat(locatedBlock.getStorageTypes()[0], is(storageType)); - } - return locatedBlocks; + @Test + public void testShortCircuitReadMetaFileCorruption() throws IOException, + InterruptedException { + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false); + doShortCircuitReadMetaFileCorruptionTest(); } - private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist, - long seed) throws IOException { - DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, - BLOCK_SIZE, REPL_FACTOR, seed, true); + @Test + public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException, + InterruptedException { + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true); + doShortCircuitReadMetaFileCorruptionTest(); } - private void triggerBlockReport() - throws IOException, InterruptedException { - // Trigger block report to NN - DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); - Thread.sleep(10 * 1000); + public void doShortCircuitReadMetaFileCorruptionTest() throws IOException, + InterruptedException { + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + + final int SEED = 0xFADED; + makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + + // Create another file with a replica on RAM_DISK, which evicts the first. + makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); + + // Sleep for a short time to allow the lazy writer thread to do its job. + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + triggerBlockReport(); + + // Corrupt the lazy-persisted checksum file, and verify that checksum + // verification catches it. + ensureFileReplicasOnStorageType(path1, DEFAULT); + File metaFile = MiniDFSCluster.getBlockMetadataFile(0, + DFSTestUtil.getFirstBlock(fs, path1)); + MiniDFSCluster.corruptBlock(metaFile); + exception.expect(ChecksumException.class); + DFSTestUtil.readFileBuffer(fs, path1); } }
