http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java deleted file mode 100644 index 9e78c10..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java +++ /dev/null @@ -1,551 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.net.InetSocketAddress; -import java.net.URL; -import java.util.Collections; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.TimeoutException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.Time; -import org.apache.log4j.Level; -import org.junit.Test; - -/** - * This test verifies that block verification occurs on the datanode - */ -public class TestDatanodeBlockScanner { - - private static final Log LOG = - LogFactory.getLog(TestDatanodeBlockScanner.class); - - private static final long TIMEOUT = 20000; // 20 sec. - - private static final Pattern pattern = - Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)"); - - private static final Pattern pattern_blockVerify = - Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)"); - - static { - ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN); - } - /** - * This connects to datanode and fetches block verification data. - * It repeats this until the given block has a verification time > newTime. - * @param newTime - validation timestamps before newTime are "old", the - * result of previous validations. This method waits until a "new" - * validation timestamp is obtained. If no validator runs soon - * enough, the method will time out. - * @return - the new validation timestamp - * @throws IOException - * @throws TimeoutException - */ - private static long waitForVerification(int infoPort, FileSystem fs, - Path file, int blocksValidated, - long newTime, long timeout) - throws IOException, TimeoutException { - URL url = new URL("http://localhost:" + infoPort + - "/blockScannerReport?listblocks"); - long lastWarnTime = Time.monotonicNow(); - if (newTime <= 0) newTime = 1L; - long verificationTime = 0; - - String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName(); - long failtime = (timeout <= 0) ? Long.MAX_VALUE - : Time.monotonicNow() + timeout; - while (verificationTime < newTime) { - if (failtime < Time.monotonicNow()) { - throw new TimeoutException("failed to achieve block verification after " - + timeout + " msec. Current verification timestamp = " - + verificationTime + ", requested verification time > " - + newTime); - } - String response = DFSTestUtil.urlGet(url); - if(blocksValidated >= 0) { - for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) { - if (block.equals(matcher.group(1))) { - assertEquals(1, blocksValidated); - break; - } - } - } - for(Matcher matcher = pattern.matcher(response); matcher.find();) { - if (block.equals(matcher.group(1))) { - verificationTime = Long.parseLong(matcher.group(2)); - break; - } - } - - if (verificationTime < newTime) { - long now = Time.monotonicNow(); - if ((now - lastWarnTime) >= 5*1000) { - LOG.info("Waiting for verification of " + block); - lastWarnTime = now; - } - try { - Thread.sleep(500); - } catch (InterruptedException ignored) {} - } - } - - return verificationTime; - } - - @Test - public void testDatanodeBlockScanner() throws IOException, TimeoutException { - long startTime = Time.monotonicNow(); - - Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - - FileSystem fs = cluster.getFileSystem(); - Path file1 = new Path("/tmp/testBlockVerification/file1"); - Path file2 = new Path("/tmp/testBlockVerification/file2"); - - /* - * Write the first file and restart the cluster. - */ - DFSTestUtil.createFile(fs, file1, 10, (short)1, 0); - cluster.shutdown(); - - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1) - .format(false).build(); - cluster.waitActive(); - - DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), conf); - fs = cluster.getFileSystem(); - DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]; - - /* - * The cluster restarted. The block should be verified by now. - */ - assertTrue(waitForVerification(dn.getInfoPort(), fs, file1, 1, startTime, - TIMEOUT) >= startTime); - - /* - * Create a new file and read the block. The block should be marked - * verified since the client reads the block and verifies checksum. - */ - DFSTestUtil.createFile(fs, file2, 10, (short)1, 0); - IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), - conf, true); - assertTrue(waitForVerification(dn.getInfoPort(), fs, file2, 2, startTime, - TIMEOUT) >= startTime); - - cluster.shutdown(); - } - - @Test - public void testBlockCorruptionPolicy() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); - Random random = new Random(); - FileSystem fs = null; - int rand = random.nextInt(3); - - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - Path file1 = new Path("/tmp/testBlockVerification/file1"); - DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0); - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); - - DFSTestUtil.waitReplication(fs, file1, (short)3); - assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0)); - - // Corrupt random replica of block - assertTrue(cluster.corruptReplica(rand, block)); - - // Restart the datanode hoping the corrupt block to be reported - cluster.restartDataNode(rand); - - // We have 2 good replicas and block is not corrupt - DFSTestUtil.waitReplication(fs, file1, (short)2); - assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0)); - - // Corrupt all replicas. Now, block should be marked as corrupt - // and we should get all the replicas - assertTrue(cluster.corruptReplica(0, block)); - assertTrue(cluster.corruptReplica(1, block)); - assertTrue(cluster.corruptReplica(2, block)); - - // Trigger each of the DNs to scan this block immediately. - // The block pool scanner doesn't run frequently enough on its own - // to notice these, and due to HDFS-1371, the client won't report - // bad blocks to the NN when all replicas are bad. - for (DataNode dn : cluster.getDataNodes()) { - DataNodeTestUtils.runBlockScannerForBlock(dn, block); - } - - // We now have the blocks to be marked as corrupt and we get back all - // its replicas - DFSTestUtil.waitReplication(fs, file1, (short)3); - assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0)); - cluster.shutdown(); - } - - /** - * testBlockCorruptionRecoveryPolicy. - * This tests recovery of corrupt replicas, first for one corrupt replica - * then for two. The test invokes blockCorruptionRecoveryPolicy which - * 1. Creates a block with desired number of replicas - * 2. Corrupts the desired number of replicas and restarts the datanodes - * containing the corrupt replica. Additionaly we also read the block - * in case restarting does not report corrupt replicas. - * Restarting or reading from the datanode would trigger reportBadBlocks - * to namenode. - * NameNode adds it to corruptReplicasMap and neededReplication - * 3. Test waits until all corrupt replicas are reported, meanwhile - * Re-replciation brings the block back to healthy state - * 4. Test again waits until the block is reported with expected number - * of good replicas. - */ - @Test - public void testBlockCorruptionRecoveryPolicy1() throws Exception { - // Test recovery of 1 corrupt replica - LOG.info("Testing corrupt replica recovery for one corrupt replica"); - blockCorruptionRecoveryPolicy(4, (short)3, 1); - } - - @Test - public void testBlockCorruptionRecoveryPolicy2() throws Exception { - // Test recovery of 2 corrupt replicas - LOG.info("Testing corrupt replica recovery for two corrupt replicas"); - blockCorruptionRecoveryPolicy(5, (short)3, 2); - } - - private void blockCorruptionRecoveryPolicy(int numDataNodes, - short numReplicas, - int numCorruptReplicas) - throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3); - conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L); - - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); - cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); - Path file1 = new Path("/tmp/testBlockCorruptRecovery/file"); - DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0); - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); - final int ITERATIONS = 10; - - // Wait until block is replicated to numReplicas - DFSTestUtil.waitReplication(fs, file1, numReplicas); - - for (int k = 0; ; k++) { - // Corrupt numCorruptReplicas replicas of block - int[] corruptReplicasDNIDs = new int[numCorruptReplicas]; - for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) { - if (cluster.corruptReplica(i, block)) { - corruptReplicasDNIDs[j++] = i; - LOG.info("successfully corrupted block " + block + " on node " - + i + " " + cluster.getDataNodes().get(i).getDisplayName()); - } - } - - // Restart the datanodes containing corrupt replicas - // so they would be reported to namenode and re-replicated - // They MUST be restarted in reverse order from highest to lowest index, - // because the act of restarting them removes them from the ArrayList - // and causes the indexes of all nodes above them in the list to change. - for (int i = numCorruptReplicas - 1; i >= 0 ; i--) { - LOG.info("restarting node with corrupt replica: position " - + i + " node " + corruptReplicasDNIDs[i] + " " - + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName()); - cluster.restartDataNode(corruptReplicasDNIDs[i]); - } - - // Loop until all corrupt replicas are reported - try { - DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, - block, numCorruptReplicas); - } catch(TimeoutException e) { - if (k > ITERATIONS) { - throw e; - } - LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k); - continue; - } - break; - } - - // Loop until the block recovers after replication - DFSTestUtil.waitReplication(fs, file1, numReplicas); - assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0)); - - // Make sure the corrupt replica is invalidated and removed from - // corruptReplicasMap - DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, - block, 0); - cluster.shutdown(); - } - - /** Test if NameNode handles truncated blocks in block report */ - @Test - public void testTruncatedBlockReport() throws Exception { - final Configuration conf = new HdfsConfiguration(); - final short REPLICATION_FACTOR = (short)2; - final Path fileName = new Path("/file1"); - - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3); - conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - - long startTime = Time.monotonicNow(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(REPLICATION_FACTOR) - .build(); - cluster.waitActive(); - - ExtendedBlock block; - try { - FileSystem fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0); - DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR); - block = DFSTestUtil.getFirstBlock(fs, fileName); - } finally { - cluster.shutdown(); - } - - // Restart cluster and confirm block is verified on datanode 0, - // then truncate it on datanode 0. - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(REPLICATION_FACTOR) - .format(false) - .build(); - cluster.waitActive(); - try { - FileSystem fs = cluster.getFileSystem(); - int infoPort = cluster.getDataNodes().get(0).getInfoPort(); - assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime); - - // Truncate replica of block - if (!changeReplicaLength(cluster, block, 0, -1)) { - throw new IOException( - "failed to find or change length of replica on node 0 " - + cluster.getDataNodes().get(0).getDisplayName()); - } - } finally { - cluster.shutdown(); - } - - // Restart the cluster, add a node, and check that the truncated block is - // handled correctly - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(REPLICATION_FACTOR) - .format(false) - .build(); - cluster.startDataNodes(conf, 1, true, null, null); - cluster.waitActive(); // now we have 3 datanodes - - // Assure the cluster has left safe mode. - cluster.waitClusterUp(); - assertFalse("failed to leave safe mode", - cluster.getNameNode().isInSafeMode()); - - try { - // wait for truncated block be detected by block scanner, - // and the block to be replicated - DFSTestUtil.waitReplication( - cluster.getFileSystem(), fileName, REPLICATION_FACTOR); - - // Make sure that truncated block will be deleted - waitForBlockDeleted(cluster, block, 0, TIMEOUT); - } finally { - cluster.shutdown(); - } - } - - /** - * Change the length of a block at datanode dnIndex - */ - static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk, - int dnIndex, int lenDelta) throws IOException { - File blockFile = cluster.getBlockFile(dnIndex, blk); - if (blockFile != null && blockFile.exists()) { - RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); - raFile.setLength(raFile.length()+lenDelta); - raFile.close(); - return true; - } - LOG.info("failed to change length of block " + blk); - return false; - } - - private static void waitForBlockDeleted(MiniDFSCluster cluster, - ExtendedBlock blk, int dnIndex, long timeout) throws TimeoutException, - InterruptedException { - File blockFile = cluster.getBlockFile(dnIndex, blk); - long failtime = Time.monotonicNow() - + ((timeout > 0) ? timeout : Long.MAX_VALUE); - while (blockFile != null && blockFile.exists()) { - if (failtime < Time.monotonicNow()) { - throw new TimeoutException("waited too long for blocks to be deleted: " - + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; ")); - } - Thread.sleep(100); - blockFile = cluster.getBlockFile(dnIndex, blk); - } - } - - private static final String BASE_PATH = (new File("/data/current/finalized")) - .getAbsolutePath(); - - @Test - public void testReplicaInfoParsing() throws Exception { - testReplicaInfoParsingSingle(BASE_PATH); - testReplicaInfoParsingSingle(BASE_PATH + "/subdir1"); - testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3"); - } - - private static void testReplicaInfoParsingSingle(String subDirPath) { - File testFile = new File(subDirPath); - assertEquals(BASE_PATH, ReplicaInfo.parseBaseDir(testFile).baseDirPath); - } - - @Test - public void testDuplicateScans() throws Exception { - long startTime = Time.monotonicNow(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration()) - .numDataNodes(1).build(); - FileSystem fs = null; - try { - fs = cluster.getFileSystem(); - DataNode dataNode = cluster.getDataNodes().get(0); - int infoPort = dataNode.getInfoPort(); - long scanTimeBefore = 0, scanTimeAfter = 0; - for (int i = 1; i < 10; i++) { - Path fileName = new Path("/test" + i); - DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L); - waitForVerification(infoPort, fs, fileName, i, startTime, TIMEOUT); - if (i > 1) { - scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode, - DFSTestUtil.getFirstBlock(fs, new Path("/test" + (i - 1)))); - assertFalse("scan time shoud not be 0", scanTimeAfter == 0); - assertEquals("There should not be duplicate scan", scanTimeBefore, - scanTimeAfter); - } - - scanTimeBefore = DataNodeTestUtils.getLatestScanTime(dataNode, - DFSTestUtil.getFirstBlock(fs, new Path("/test" + i))); - } - cluster.restartDataNode(0); - Thread.sleep(10000); - dataNode = cluster.getDataNodes().get(0); - scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode, - DFSTestUtil.getFirstBlock(fs, new Path("/test" + (9)))); - assertEquals("There should not be duplicate scan", scanTimeBefore, - scanTimeAfter); - } finally { - IOUtils.closeStream(fs); - cluster.shutdown(); - } - } - -/** - * This test verifies whether block is added to the first location of - * BlockPoolSliceScanner#blockInfoSet - */ - @Test - public void testAddBlockInfoToFirstLocation() throws Exception { - MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration()) - .numDataNodes(1).build(); - FileSystem fs = null; - try { - fs = cluster.getFileSystem(); - DataNode dataNode = cluster.getDataNodes().get(0); - // Creating a bunch of blocks - for (int i = 1; i < 10; i++) { - Path fileName = new Path("/test" + i); - DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L); - } - // Get block of the first file created (file1) - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1")); - dataNode.getBlockScanner().setLastScanTimeDifference(block, 0); - // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can - // scan the first set of blocks - Thread.sleep(10000); - Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime( - dataNode, block); - // Create another set of blocks - for (int i = 10; i < 20; i++) { - Path fileName = new Path("/test" + i); - DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L); - } - dataNode.getBlockScanner().addBlock(block, true); - // Sleep so that BlockPoolSliceScanner can scan the second set of blocks - // and one block which we scheduled to rescan - Thread.sleep(10000); - // Get the lastScanTime of all of the second set of blocks - Set<Long> lastScanTimeSet = new HashSet<Long>(); - for (int i = 10; i < 20; i++) { - long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode, - DFSTestUtil.getFirstBlock(fs, new Path("/test" + i))); - lastScanTimeSet.add(lastScanTime); - } - Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime( - dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1"))); - Long minimumLastScanTime = Collections.min(lastScanTimeSet); - assertTrue("The second scanTime for test1 block should be greater than " - + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block); - assertTrue("The second scanTime for test1 block should be less than or" - + " equal to minimum of the lastScanTime of second set of blocks", - scanTime2Fortest1Block <= minimumLastScanTime); - } finally { - IOUtils.closeStream(fs); - cluster.shutdown(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java index b88b5c2..d116f82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java @@ -444,8 +444,7 @@ public class TestReplication { // Change the length of a replica for (int i=0; i<cluster.getDataNodes().size(); i++) { - if (TestDatanodeBlockScanner.changeReplicaLength(cluster, block, i, - lenDelta)) { + if (DFSTestUtil.changeReplicaLength(cluster, block, i, lenDelta)) { break; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index f8f476d..2942d0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.TestDatanodeBlockScanner; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -54,6 +53,7 @@ public class TestOverReplicatedBlocks { @Test public void testProcesOverReplicateBlock() throws Exception { Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.set( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, @@ -71,13 +71,14 @@ public class TestOverReplicatedBlocks { assertTrue(cluster.corruptReplica(0, block)); DataNodeProperties dnProps = cluster.stopDataNode(0); // remove block scanner log to trigger block scanning - File scanLog = new File(MiniDFSCluster.getFinalizedDir( + File scanCursor = new File(new File(MiniDFSCluster.getFinalizedDir( cluster.getInstanceStorageDir(0, 0), - cluster.getNamesystem().getBlockPoolId()).getParent().toString() - + "/../dncp_block_verification.log.prev"); + cluster.getNamesystem().getBlockPoolId()).getParent()).getParent(), + "scanner.cursor"); //wait for one minute for deletion to succeed; - for(int i=0; !scanLog.delete(); i++) { - assertTrue("Could not delete log file in one minute", i < 60); + for(int i = 0; !scanCursor.delete(); i++) { + assertTrue("Could not delete " + scanCursor.getAbsolutePath() + + " in one minute", i < 60); try { Thread.sleep(1000); } catch (InterruptedException ignored) {} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index e9557da..68c66a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -82,8 +83,8 @@ public abstract class BlockReportTestBase { private static short REPL_FACTOR = 1; private static final int RAND_LIMIT = 2000; - private static final long DN_RESCAN_INTERVAL = 5000; - private static final long DN_RESCAN_EXTRA_WAIT = 2 * DN_RESCAN_INTERVAL; + private static final long DN_RESCAN_INTERVAL = 1; + private static final long DN_RESCAN_EXTRA_WAIT = 3 * DN_RESCAN_INTERVAL; private static final int DN_N0 = 0; private static final int FILE_START = 0; @@ -294,7 +295,7 @@ public abstract class BlockReportTestBase { } } - waitTil(DN_RESCAN_EXTRA_WAIT); + waitTil(TimeUnit.SECONDS.toMillis(DN_RESCAN_EXTRA_WAIT)); // all blocks belong to the same file, hence same BP String poolId = cluster.getNamesystem().getBlockPoolId(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index f50afd4..fd51e52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -113,30 +113,6 @@ public class DataNodeTestUtils { return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf, dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname); } - - public static void runBlockScannerForBlock(DataNode dn, ExtendedBlock b) { - BlockPoolSliceScanner bpScanner = getBlockPoolScanner(dn, b); - bpScanner.verifyBlock(new ExtendedBlock(b.getBlockPoolId(), - new BlockPoolSliceScanner.BlockScanInfo(b.getLocalBlock()))); - } - - private static BlockPoolSliceScanner getBlockPoolScanner(DataNode dn, - ExtendedBlock b) { - DataBlockScanner scanner = dn.getBlockScanner(); - BlockPoolSliceScanner bpScanner = scanner.getBPScanner(b.getBlockPoolId()); - return bpScanner; - } - - public static long getLatestScanTime(DataNode dn, ExtendedBlock b) { - BlockPoolSliceScanner scanner = getBlockPoolScanner(dn, b); - return scanner.getLastScanTime(b.getLocalBlock()); - } - - public static void shutdownBlockScanner(DataNode dn) { - if (dn.blockScanner != null) { - dn.blockScanner.shutdown(); - } - } /** * This method is used for testing. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 16b6350..0610b94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -484,6 +483,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override public void releaseReservedSpace(long bytesToRelease) { } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FsDatasetSpi getDataset() { + throw new UnsupportedOperationException(); + } } private final Map<String, Map<Block, BInfo>> blockMap @@ -1238,11 +1253,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override - public RollingLogs createRollingLogs(String bpid, String prefix) { - throw new UnsupportedOperationException(); - } - - @Override public FsVolumeSpi getVolume(ExtendedBlock b) { return volume; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java new file mode 100644 index 0000000..7eaa2bf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -0,0 +1,680 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; +import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS; +import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER; +import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +import com.google.common.base.Supplier; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; +import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.Statistics; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestBlockScanner { + public static final Logger LOG = + LoggerFactory.getLogger(TestBlockScanner.class); + + @Before + public void before() { + BlockScanner.Conf.allowUnitTestSettings = true; + GenericTestUtils.setLogLevel(BlockScanner.LOG, Level.ALL); + GenericTestUtils.setLogLevel(VolumeScanner.LOG, Level.ALL); + GenericTestUtils.setLogLevel(FsVolumeImpl.LOG, Level.ALL); + } + + private static void disableBlockScanner(Configuration conf) { + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 0L); + } + + private static class TestContext implements Closeable { + final int numNameServices; + final MiniDFSCluster cluster; + final DistributedFileSystem[] dfs; + final String[] bpids; + final DataNode datanode; + final BlockScanner blockScanner; + final FsDatasetSpi<? extends FsVolumeSpi> data; + final List<? extends FsVolumeSpi> volumes; + + TestContext(Configuration conf, int numNameServices) throws Exception { + this.numNameServices = numNameServices; + MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf). + numDataNodes(1). + storagesPerDatanode(1); + if (numNameServices > 1) { + bld.nnTopology(MiniDFSNNTopology. + simpleFederatedTopology(numNameServices)); + } + cluster = bld.build(); + cluster.waitActive(); + dfs = new DistributedFileSystem[numNameServices]; + for (int i = 0; i < numNameServices; i++) { + dfs[i] = cluster.getFileSystem(i); + } + bpids = new String[numNameServices]; + for (int i = 0; i < numNameServices; i++) { + bpids[i] = cluster.getNamesystem(i).getBlockPoolId(); + } + datanode = cluster.getDataNodes().get(0); + blockScanner = datanode.getBlockScanner(); + for (int i = 0; i < numNameServices; i++) { + dfs[i].mkdirs(new Path("/test")); + } + data = datanode.getFSDataset(); + volumes = data.getVolumes(); + } + + @Override + public void close() throws IOException { + if (cluster != null) { + for (int i = 0; i < numNameServices; i++) { + dfs[i].delete(new Path("/test"), true); + } + cluster.shutdown(); + } + } + + public void createFiles(int nsIdx, int numFiles, int length) + throws Exception { + for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) { + DFSTestUtil.createFile(dfs[nsIdx], getPath(blockIdx), length, + (short)1, 123L); + } + } + + public Path getPath(int fileIdx) { + return new Path("/test/" + fileIdx); + } + + public ExtendedBlock getFileBlock(int nsIdx, int fileIdx) + throws Exception { + return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx)); + } + } + + /** + * Test iterating through a bunch of blocks in a volume using a volume + * iterator.<p/> + * + * We will rewind the iterator when about halfway through the blocks. + * + * @param numFiles The number of files to create. + * @param maxStaleness The maximum staleness to allow with the iterator. + * @throws Exception + */ + private void testVolumeIteratorImpl(int numFiles, + long maxStaleness) throws Exception { + Configuration conf = new Configuration(); + disableBlockScanner(conf); + TestContext ctx = new TestContext(conf, 1); + ctx.createFiles(0, numFiles, 1); + assertEquals(1, ctx.volumes.size()); + FsVolumeSpi volume = ctx.volumes.get(0); + ExtendedBlock savedBlock = null, loadedBlock = null; + boolean testedRewind = false, testedSave = false, testedLoad = false; + int blocksProcessed = 0, savedBlocksProcessed = 0; + try { + BPOfferService bpos[] = ctx.datanode.getAllBpOs(); + assertEquals(1, bpos.length); + BlockIterator iter = volume.newBlockIterator(ctx.bpids[0], "test"); + assertEquals(ctx.bpids[0], iter.getBlockPoolId()); + iter.setMaxStalenessMs(maxStaleness); + while (true) { + HashSet<ExtendedBlock> blocks = new HashSet<ExtendedBlock>(); + for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) { + blocks.add(ctx.getFileBlock(0, blockIdx)); + } + while (true) { + ExtendedBlock block = iter.nextBlock(); + if (block == null) { + break; + } + blocksProcessed++; + LOG.info("BlockIterator for {} found block {}, blocksProcessed = {}", + volume, block, blocksProcessed); + if (testedSave && (savedBlock == null)) { + savedBlock = block; + } + if (testedLoad && (loadedBlock == null)) { + loadedBlock = block; + // The block that we get back right after loading the iterator + // should be the same block we got back right after saving + // the iterator. + assertEquals(savedBlock, loadedBlock); + } + boolean blockRemoved = blocks.remove(block); + assertTrue("Found unknown block " + block, blockRemoved); + if (blocksProcessed > (numFiles / 3)) { + if (!testedSave) { + LOG.info("Processed {} blocks out of {}. Saving iterator.", + blocksProcessed, numFiles); + iter.save(); + testedSave = true; + savedBlocksProcessed = blocksProcessed; + } + } + if (blocksProcessed > (numFiles / 2)) { + if (!testedRewind) { + LOG.info("Processed {} blocks out of {}. Rewinding iterator.", + blocksProcessed, numFiles); + iter.rewind(); + break; + } + } + if (blocksProcessed > ((2 * numFiles) / 3)) { + if (!testedLoad) { + LOG.info("Processed {} blocks out of {}. Loading iterator.", + blocksProcessed, numFiles); + iter = volume.loadBlockIterator(ctx.bpids[0], "test"); + iter.setMaxStalenessMs(maxStaleness); + break; + } + } + } + if (!testedRewind) { + testedRewind = true; + blocksProcessed = 0; + LOG.info("Starting again at the beginning..."); + continue; + } + if (!testedLoad) { + testedLoad = true; + blocksProcessed = savedBlocksProcessed; + LOG.info("Starting again at the load point..."); + continue; + } + assertEquals(numFiles, blocksProcessed); + break; + } + } finally { + ctx.close(); + } + } + + @Test(timeout=60000) + public void testVolumeIteratorWithoutCaching() throws Exception { + testVolumeIteratorImpl(5, 0); + } + + @Test(timeout=60000) + public void testVolumeIteratorWithCaching() throws Exception { + testVolumeIteratorImpl(600, 100); + } + + @Test(timeout=60000) + public void testDisableVolumeScanner() throws Exception { + Configuration conf = new Configuration(); + disableBlockScanner(conf); + TestContext ctx = new TestContext(conf, 1); + try { + Assert.assertFalse(ctx.datanode.getBlockScanner().isEnabled()); + } finally { + ctx.close(); + } + } + + public static class TestScanResultHandler extends ScanResultHandler { + static class Info { + boolean shouldRun = false; + final Set<ExtendedBlock> badBlocks = new HashSet<ExtendedBlock>(); + final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>(); + long blocksScanned = 0; + Semaphore sem = null; + } + + private VolumeScanner scanner; + + final static ConcurrentHashMap<String, Info> infos = + new ConcurrentHashMap<String, Info>(); + + static Info getInfo(FsVolumeSpi volume) { + Info newInfo = new Info(); + Info prevInfo = infos. + putIfAbsent(volume.getStorageID(), newInfo); + return prevInfo == null ? newInfo : prevInfo; + } + + @Override + public void setup(VolumeScanner scanner) { + this.scanner = scanner; + Info info = getInfo(scanner.volume); + LOG.info("about to start scanning."); + synchronized (info) { + while (!info.shouldRun) { + try { + info.wait(); + } catch (InterruptedException e) { + } + } + } + LOG.info("starting scanning."); + } + + @Override + public void handle(ExtendedBlock block, IOException e) { + LOG.info("handling block {} (exception {})", block, e); + Info info = getInfo(scanner.volume); + Semaphore sem; + synchronized (info) { + sem = info.sem; + } + if (sem != null) { + try { + sem.acquire(); + } catch (InterruptedException ie) { + throw new RuntimeException("interrupted"); + } + } + synchronized (info) { + if (!info.shouldRun) { + throw new RuntimeException("stopping volumescanner thread."); + } + if (e == null) { + info.goodBlocks.add(block); + } else { + info.badBlocks.add(block); + } + info.blocksScanned++; + } + } + } + + private void testScanAllBlocksImpl(final boolean rescan) throws Exception { + Configuration conf = new Configuration(); + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576L); + if (rescan) { + conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 100L); + } else { + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + } + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + final TestContext ctx = new TestContext(conf, 1); + final int NUM_EXPECTED_BLOCKS = 10; + ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1); + final Set<ExtendedBlock> expectedBlocks = new HashSet<ExtendedBlock>(); + for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) { + expectedBlocks.add(ctx.getFileBlock(0, i)); + } + TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + GenericTestUtils.waitFor(new Supplier<Boolean>(){ + @Override + public Boolean get() { + TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + int numFoundBlocks = 0; + StringBuilder foundBlocksBld = new StringBuilder(); + String prefix = ""; + synchronized (info) { + for (ExtendedBlock block : info.goodBlocks) { + assertTrue(expectedBlocks.contains(block)); + numFoundBlocks++; + foundBlocksBld.append(prefix).append(block); + prefix = ", "; + } + LOG.info("numFoundBlocks = {}. blocksScanned = {}. Found blocks {}", + numFoundBlocks, info.blocksScanned, foundBlocksBld.toString()); + if (rescan) { + return (numFoundBlocks == NUM_EXPECTED_BLOCKS) && + (info.blocksScanned >= 2 * NUM_EXPECTED_BLOCKS); + } else { + return numFoundBlocks == NUM_EXPECTED_BLOCKS; + } + } + } + }, 10, 60000); + if (!rescan) { + synchronized (info) { + assertEquals(NUM_EXPECTED_BLOCKS, info.blocksScanned); + } + Statistics stats = ctx.blockScanner.getVolumeStats( + ctx.volumes.get(0).getStorageID()); + assertEquals(5 * NUM_EXPECTED_BLOCKS, stats.bytesScannedInPastHour); + assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedSinceRestart); + assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedInCurrentPeriod); + assertEquals(0, stats.scanErrorsSinceRestart); + assertEquals(1, stats.scansSinceRestart); + } + ctx.close(); + } + + /** + * Test scanning all blocks. Set the scan period high enough that + * we shouldn't rescan any block during this test. + */ + @Test(timeout=60000) + public void testScanAllBlocksNoRescan() throws Exception { + testScanAllBlocksImpl(false); + } + + /** + * Test scanning all blocks. Set the scan period high enough that + * we should rescan all blocks at least twice during this test. + */ + @Test(timeout=60000) + public void testScanAllBlocksWithRescan() throws Exception { + testScanAllBlocksImpl(true); + } + + /** + * Test that we don't scan too many blocks per second. + */ + @Test(timeout=120000) + public void testScanRateLimit() throws Exception { + Configuration conf = new Configuration(); + // Limit scan bytes per second dramatically + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 4096L); + // Scan continuously + conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 1L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + final TestContext ctx = new TestContext(conf, 1); + final int NUM_EXPECTED_BLOCKS = 5; + ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4096); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + long startMs = Time.monotonicNow(); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + Thread.sleep(5000); + synchronized (info) { + long endMs = Time.monotonicNow(); + // Should scan no more than one block a second. + long maxBlocksScanned = ((endMs + 999 - startMs) / 1000); + assertTrue(info.blocksScanned < maxBlocksScanned); + } + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + synchronized (info) { + return info.blocksScanned > 0; + } + } + }, 1, 30000); + ctx.close(); + } + + @Test(timeout=120000) + public void testCorruptBlockHandling() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + final TestContext ctx = new TestContext(conf, 1); + final int NUM_EXPECTED_BLOCKS = 5; + final int CORRUPT_INDEX = 3; + ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4); + ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX); + ctx.cluster.corruptBlockOnDataNodes(badBlock); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + synchronized (info) { + return info.blocksScanned == NUM_EXPECTED_BLOCKS; + } + } + }, 3, 30000); + synchronized (info) { + assertTrue(info.badBlocks.contains(badBlock)); + for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) { + if (i != CORRUPT_INDEX) { + ExtendedBlock block = ctx.getFileBlock(0, i); + assertTrue(info.goodBlocks.contains(block)); + } + } + } + ctx.close(); + } + + /** + * Test that we save the scan cursor when shutting down the datanode, and + * restart scanning from there when the datanode is restarted. + */ + @Test(timeout=120000) + public void testDatanodeCursor() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L); + final TestContext ctx = new TestContext(conf, 1); + final int NUM_EXPECTED_BLOCKS = 10; + ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.sem = new Semaphore(5); + info.shouldRun = true; + info.notify(); + } + // Scan the first 5 blocks + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + synchronized (info) { + return info.blocksScanned == 5; + } + } + }, 3, 30000); + synchronized (info) { + assertEquals(5, info.goodBlocks.size()); + assertEquals(5, info.blocksScanned); + info.shouldRun = false; + } + ctx.datanode.shutdown(); + String vPath = ctx.volumes.get(0).getBasePath(); + File cursorPath = new File(new File(new File(vPath, "current"), + ctx.bpids[0]), "scanner.cursor"); + assertTrue("Failed to find cursor save file in " + + cursorPath.getAbsolutePath(), cursorPath.exists()); + Set<ExtendedBlock> prevGoodBlocks = new HashSet<ExtendedBlock>(); + synchronized (info) { + info.sem = new Semaphore(4); + prevGoodBlocks.addAll(info.goodBlocks); + info.goodBlocks.clear(); + } + + // The block that we were scanning when we shut down the DN won't get + // recorded. + // After restarting the datanode, we should scan the next 4 blocks. + ctx.cluster.restartDataNode(0); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + synchronized (info) { + if (info.blocksScanned != 9) { + LOG.info("Waiting for blocksScanned to reach 9. It is at {}", + info.blocksScanned); + } + return info.blocksScanned == 9; + } + } + }, 3, 30000); + synchronized (info) { + assertEquals(4, info.goodBlocks.size()); + info.goodBlocks.addAll(prevGoodBlocks); + assertEquals(9, info.goodBlocks.size()); + assertEquals(9, info.blocksScanned); + } + ctx.datanode.shutdown(); + + // After restarting the datanode, we should not scan any more blocks. + // This is because we reached the end of the block pool earlier, and + // the scan period is much, much longer than the test time. + synchronized (info) { + info.sem = null; + info.shouldRun = false; + info.goodBlocks.clear(); + } + ctx.cluster.restartDataNode(0); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + Thread.sleep(3000); + synchronized (info) { + assertTrue(info.goodBlocks.isEmpty()); + } + ctx.close(); + } + + @Test(timeout=120000) + public void testMultipleBlockPoolScanning() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + final TestContext ctx = new TestContext(conf, 3); + + // We scan 5 bytes per file (1 byte in file, 4 bytes of checksum) + final int BYTES_SCANNED_PER_FILE = 5; + final int NUM_FILES[] = new int[] { 1, 5, 10 }; + int TOTAL_FILES = 0; + for (int i = 0; i < NUM_FILES.length; i++) { + TOTAL_FILES += NUM_FILES[i]; + } + ctx.createFiles(0, NUM_FILES[0], 1); + ctx.createFiles(0, NUM_FILES[1], 1); + ctx.createFiles(0, NUM_FILES[2], 1); + + // start scanning + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + + // Wait for all the block pools to be scanned. + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + synchronized (info) { + Statistics stats = ctx.blockScanner.getVolumeStats( + ctx.volumes.get(0).getStorageID()); + if (stats.scansSinceRestart < 3) { + LOG.info("Waiting for scansSinceRestart to reach 3 (it is {})", + stats.scansSinceRestart); + return false; + } + if (!stats.eof) { + LOG.info("Waiting for eof."); + return false; + } + return true; + } + } + }, 3, 30000); + + Statistics stats = ctx.blockScanner.getVolumeStats( + ctx.volumes.get(0).getStorageID()); + assertEquals(TOTAL_FILES, stats.blocksScannedSinceRestart); + assertEquals(BYTES_SCANNED_PER_FILE * TOTAL_FILES, + stats.bytesScannedInPastHour); + ctx.close(); + } + + @Test(timeout=120000) + public void testNextSorted() throws Exception { + List<String> arr = new LinkedList<String>(); + arr.add("1"); + arr.add("3"); + arr.add("5"); + arr.add("7"); + Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "2")); + Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "1")); + Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, "")); + Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, null)); + Assert.assertEquals(null, FsVolumeImpl.nextSorted(arr, "9")); + } + + @Test(timeout=120000) + public void testCalculateNeededBytesPerSec() throws Exception { + // If we didn't check anything the last hour, we should scan now. + Assert.assertTrue( + VolumeScanner.calculateShouldScan(100, 0)); + + // If, on average, we checked 101 bytes/s checked during the last hour, + // stop checking now. + Assert.assertFalse( + VolumeScanner.calculateShouldScan(100, 101 * 3600)); + + // Target is 1 byte / s, but we didn't scan anything in the last minute. + // Should scan now. + Assert.assertTrue( + VolumeScanner.calculateShouldScan(1, 3540)); + + // Target is 1000000 byte / s, but we didn't scan anything in the last + // minute. Should scan now. + Assert.assertTrue( + VolumeScanner.calculateShouldScan(100000L, 354000000L)); + + Assert.assertFalse( + VolumeScanner.calculateShouldScan(100000L, 365000000L)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 1b8f243..82a1684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -589,6 +589,22 @@ public class TestDirectoryScanner { @Override public void releaseReservedSpace(long bytesToRelease) { } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FsDatasetSpi getDataset() { + throw new UnsupportedOperationException(); + } } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java deleted file mode 100644 index 55b1739..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS; -import org.apache.log4j.Level; -import org.junit.Assert; -import org.junit.Test; -import static org.junit.Assert.fail; - - -public class TestMultipleNNDataBlockScanner { - private static final Log LOG = - LogFactory.getLog(TestMultipleNNDataBlockScanner.class); - Configuration conf; - MiniDFSCluster cluster = null; - final String[] bpids = new String[3]; - final FileSystem[] fs = new FileSystem[3]; - - public void setUp() throws IOException { - conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 100); - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3)) - .build(); - for (int i = 0; i < 3; i++) { - cluster.waitActive(i); - } - for (int i = 0; i < 3; i++) { - bpids[i] = cluster.getNamesystem(i).getBlockPoolId(); - } - for (int i = 0; i < 3; i++) { - fs[i] = cluster.getFileSystem(i); - } - // Create 2 files on each namenode with 10 blocks each - for (int i = 0; i < 3; i++) { - DFSTestUtil.createFile(fs[i], new Path("file1"), 1000, (short) 1, 0); - DFSTestUtil.createFile(fs[i], new Path("file2"), 1000, (short) 1, 1); - } - } - - @Test(timeout=120000) - public void testDataBlockScanner() throws IOException, InterruptedException { - setUp(); - try { - DataNode dn = cluster.getDataNodes().get(0); - for (int i = 0; i < 3; i++) { - long blocksScanned = 0; - while (blocksScanned != 20) { - blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]); - LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i] - + "; Scanned so far=" + blocksScanned); - Thread.sleep(5000); - } - } - - StringBuilder buffer = new StringBuilder(); - dn.blockScanner.printBlockReport(buffer, false); - LOG.info("Block Report\n" + buffer.toString()); - } finally { - cluster.shutdown(); - } - } - - @Test(timeout=120000) - public void testBlockScannerAfterRefresh() throws IOException, - InterruptedException { - setUp(); - try { - Configuration dnConf = cluster.getDataNodes().get(0).getConf(); - Configuration conf = new HdfsConfiguration(dnConf); - StringBuilder namenodesBuilder = new StringBuilder(); - - String bpidToShutdown = cluster.getNamesystem(2).getBlockPoolId(); - for (int i = 0; i < 2; i++) { - String nsId = DFSUtil.getNamenodeNameServiceId(cluster - .getConfiguration(i)); - namenodesBuilder.append(nsId); - namenodesBuilder.append(","); - } - - conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder - .toString()); - DataNode dn = cluster.getDataNodes().get(0); - dn.refreshNamenodes(conf); - - try { - while (true) { - dn.blockScanner.getBlocksScannedInLastRun(bpidToShutdown); - Thread.sleep(1000); - } - } catch (IOException ex) { - // Expected - LOG.info(ex.getMessage()); - } - - namenodesBuilder.append(DFSUtil.getNamenodeNameServiceId(cluster - .getConfiguration(2))); - conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder - .toString()); - dn.refreshNamenodes(conf); - - for (int i = 0; i < 3; i++) { - long blocksScanned = 0; - while (blocksScanned != 20) { - blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]); - LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i] - + "; Scanned so far=" + blocksScanned); - Thread.sleep(5000); - } - } - } finally { - cluster.shutdown(); - } - } - - @Test(timeout=120000) - public void testBlockScannerAfterRestart() throws IOException, - InterruptedException { - setUp(); - try { - cluster.restartDataNode(0); - cluster.waitActive(); - DataNode dn = cluster.getDataNodes().get(0); - for (int i = 0; i < 3; i++) { - while (!dn.blockScanner.isInitialized(bpids[i])) { - Thread.sleep(1000); - } - long blocksScanned = 0; - while (blocksScanned != 20) { - if (dn.blockScanner != null) { - blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]); - LOG.info("Waiting for all blocks to be scanned for bpid=" - + bpids[i] + "; Scanned so far=" + blocksScanned); - } - Thread.sleep(5000); - } - } - } finally { - cluster.shutdown(); - } - } - - @Test(timeout=120000) - public void test2NNBlockRescanInterval() throws IOException { - ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL); - Configuration conf = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3)) - .build(); - - try { - FileSystem fs = cluster.getFileSystem(1); - Path file2 = new Path("/test/testBlockScanInterval"); - DFSTestUtil.createFile(fs, file2, 30, (short) 1, 0); - - fs = cluster.getFileSystem(0); - Path file1 = new Path("/test/testBlockScanInterval"); - DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0); - for (int i = 0; i < 8; i++) { - LOG.info("Verifying that the blockscanner scans exactly once"); - waitAndScanBlocks(1, 1); - } - } finally { - cluster.shutdown(); - } - } - - /** - * HDFS-3828: DN rescans blocks too frequently - * - * @throws Exception - */ - @Test(timeout=120000) - public void testBlockRescanInterval() throws IOException { - ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL); - Configuration conf = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - try { - FileSystem fs = cluster.getFileSystem(); - Path file1 = new Path("/test/testBlockScanInterval"); - DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0); - for (int i = 0; i < 4; i++) { - LOG.info("Verifying that the blockscanner scans exactly once"); - waitAndScanBlocks(1, 1); - } - } finally { - cluster.shutdown(); - } - } - - void waitAndScanBlocks(long scansLastRun, long scansTotal) - throws IOException { - // DataBlockScanner will run for every 5 seconds so we are checking for - // every 5 seconds - int n = 5; - String bpid = cluster.getNamesystem(0).getBlockPoolId(); - DataNode dn = cluster.getDataNodes().get(0); - long blocksScanned, total; - do { - try { - Thread.sleep(SLEEP_PERIOD_MS); - } catch (InterruptedException e) { - fail("Interrupted: " + e); - } - blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpid); - total = dn.blockScanner.getTotalScans(bpid); - LOG.info("bpid = " + bpid + " blocksScanned = " + blocksScanned + " total=" + total); - } while (n-- > 0 && (blocksScanned != scansLastRun || scansTotal != total)); - Assert.assertEquals(scansTotal, total); - Assert.assertEquals(scansLastRun, blocksScanned); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index f256ee6..8fd51d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -51,12 +50,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { StorageType.DEFAULT); @Override - public RollingLogs createRollingLogs(String bpid, String prefix) - throws IOException { - return new ExternalRollingLogs(); - } - - @Override public List<ExternalVolumeImpl> getVolumes() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java deleted file mode 100644 index c9fb7c8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.extdataset; - -import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; - -public class ExternalRollingLogs implements RollingLogs { - - private class ExternalLineIterator implements LineIterator { - @Override - public boolean isPrevious() { - return false; - } - - @Override - public boolean isLastReadFromPrevious() { - return false; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public String next() { - return null; - } - - @Override - public void remove() { - } - - @Override - public void close() throws IOException { - } - } - - private class ExternalAppender implements Appender { - @Override - public Appendable append(CharSequence cs) throws IOException { - return null; - } - - @Override - public Appendable append(CharSequence cs, int i, int i1) - throws IOException { - return null; - } - - @Override - public Appendable append(char c) throws IOException { - return null; - } - - @Override - public void close() throws IOException { - } - } - - @Override - public LineIterator iterator(boolean skipPrevious) throws IOException { - return new ExternalLineIterator(); - } - - @Override - public Appender appender() { - return new ExternalAppender(); - } - - @Override - public boolean roll() throws IOException { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 857e946..0ea33bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -79,4 +80,20 @@ public class ExternalVolumeImpl implements FsVolumeSpi { @Override public void releaseReservedSpace(long bytesToRelease) { } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + return null; + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + return null; + } + + @Override + public FsDatasetSpi getDataset() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java index 791bb76..82a6951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.junit.Test; /** @@ -80,14 +79,6 @@ public class TestExternalDataset { } /** - * Tests instantiating a RollingLogs subclass. - */ - @Test - public void testInstantiateRollingLogs() throws Throwable { - RollingLogs inst = new ExternalRollingLogs(); - } - - /** * Tests instantiating an FsVolumeSpi subclass. */ @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java index f92d949..691d390 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; @@ -41,16 +43,21 @@ public class FsVolumeListTest { new RoundRobinVolumeChoosingPolicy<>(); private FsDatasetImpl dataset = null; private String baseDir; + private BlockScanner blockScanner; @Before public void setUp() { dataset = mock(FsDatasetImpl.class); baseDir = new FileSystemTestHelper().getTestRootDir(); + Configuration blockScannerConf = new Configuration(); + blockScannerConf.setInt(DFSConfigKeys. + DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + blockScanner = new BlockScanner(null, blockScannerConf); } @Test public void testGetNextVolumeWithClosedVolume() throws IOException { - FsVolumeList volumeList = new FsVolumeList(0, blockChooser); + FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser); List<FsVolumeImpl> volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); @@ -59,7 +66,7 @@ public class FsVolumeListTest { conf, StorageType.DEFAULT); volume.setCapacityForTesting(1024 * 1024 * 1024); volumes.add(volume); - volumeList.addVolume(volume); + volumeList.addVolume(volume.obtainReference()); } // Close the second volume. @@ -75,7 +82,7 @@ public class FsVolumeListTest { @Test public void testCheckDirsWithClosedVolume() throws IOException { - FsVolumeList volumeList = new FsVolumeList(0, blockChooser); + FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser); List<FsVolumeImpl> volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "volume-" + i); @@ -83,7 +90,7 @@ public class FsVolumeListTest { FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir, conf, StorageType.DEFAULT); volumes.add(volume); - volumeList.addVolume(volume); + volumeList.addVolume(volume.obtainReference()); } // Close the 2nd volume. @@ -91,4 +98,4 @@ public class FsVolumeListTest { // checkDirs() should ignore the 2nd volume since it is closed. volumeList.checkDirs(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index ca936b3..f3d15de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -22,17 +22,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DNConf; -import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; @@ -51,19 +51,17 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -81,7 +79,6 @@ public class TestFsDatasetImpl { private Configuration conf; private DataNode datanode; private DataStorage storage; - private DataBlockScanner scanner; private FsDatasetImpl dataset; private static Storage.StorageDirectory createStorageDirectory(File root) { @@ -112,13 +109,14 @@ public class TestFsDatasetImpl { public void setUp() throws IOException { datanode = mock(DataNode.class); storage = mock(DataStorage.class); - scanner = mock(DataBlockScanner.class); this.conf = new Configuration(); + this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); final DNConf dnConf = new DNConf(conf); when(datanode.getConf()).thenReturn(conf); when(datanode.getDnConf()).thenReturn(dnConf); - when(datanode.getBlockScanner()).thenReturn(scanner); + final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf); + when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); createStorageDirs(storage, conf, NUM_INIT_VOLUMES); dataset = new FsDatasetImpl(datanode, storage, conf); @@ -208,10 +206,6 @@ public class TestFsDatasetImpl { assertEquals("The replica infos on this volume has been removed from the " + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES, totalNumReplicas); - - // Verify that every BlockPool deletes the removed blocks from the volume. - verify(scanner, times(BLOCK_POOL_IDS.length)) - .deleteBlocks(anyString(), any(Block[].class)); } @Test(timeout = 5000) @@ -245,7 +239,9 @@ public class TestFsDatasetImpl { public void testChangeVolumeWithRunningCheckDirs() throws IOException { RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser = new RoundRobinVolumeChoosingPolicy<>(); - final FsVolumeList volumeList = new FsVolumeList(0, blockChooser); + final BlockScanner blockScanner = new BlockScanner(datanode, conf); + final FsVolumeList volumeList = + new FsVolumeList(0, blockScanner, blockChooser); final List<FsVolumeImpl> oldVolumes = new ArrayList<>(); // Initialize FsVolumeList with 5 mock volumes. @@ -254,19 +250,23 @@ public class TestFsDatasetImpl { FsVolumeImpl volume = mock(FsVolumeImpl.class); oldVolumes.add(volume); when(volume.getBasePath()).thenReturn("data" + i); - volumeList.addVolume(volume); + FsVolumeReference ref = mock(FsVolumeReference.class); + when(ref.getVolume()).thenReturn(volume); + volumeList.addVolume(ref); } // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th // volume and add another volume. It does not affect checkDirs() running. final FsVolumeImpl newVolume = mock(FsVolumeImpl.class); + final FsVolumeReference newRef = mock(FsVolumeReference.class); + when(newRef.getVolume()).thenReturn(newVolume); FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { volumeList.removeVolume(new File("data4")); - volumeList.addVolume(newVolume); + volumeList.addVolume(newRef); return null; } }).when(blockedVolume).checkDirs(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index 3609684..6cc3d7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -184,8 +184,8 @@ public class TestInterDatanodeProtocol { InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy( datanode, datanodeinfo[0], conf, useDnHostname); - //stop block scanner, so we could compare lastScanTime - DataNodeTestUtils.shutdownBlockScanner(datanode); + // Stop the block scanners. + datanode.getBlockScanner().removeAllVolumeScanners(); //verify BlockMetaDataInfo ExtendedBlock b = locatedblock.getBlock();
