Author: suresh
Date: Sat Feb 4 02:16:46 2012
New Revision: 1240440
URL: http://svn.apache.org/viewvc?rev=1240440&view=rev
Log:
HDFS-2379. Merging change r1196456 from branch 1
Added:
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java
Modified:
hadoop/common/branches/branch-1.0/CHANGES.txt
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1240440&r1=1240439&r2=1240440&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Feb 4 02:16:46 2012
@@ -18,6 +18,9 @@ Release 1.0.1 - 2012.01.30
HADOOP-7470. Move up to Jackson 1.8.8. (Enis Soztutar via szetszwo)
+ HDFS-2379. Allow block reports to proceed without holding FSDataset lock.
+ (todd via suresh)
+
BUG FIXES
HADOOP-7960. Port HADOOP-5203 to branch-1, build version comparison is too
Modified:
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1240440&r1=1240439&r2=1240440&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Sat Feb 4 02:16:46 2012
@@ -248,7 +248,16 @@ public class DataNode extends Configured
public final static String DATA_DIR_PERMISSION_KEY =
"dfs.datanode.data.dir.perm";
private static final String DEFAULT_DATA_DIR_PERMISSION = "755";
-
+
+ // Thresholds for when we start to log when a block report is
+ // taking a long time to generate. Under heavy disk load and
+ // memory pressure, it's normal for block reports to take
+ // several minutes, since they cause many disk seeks.
+ private static final long LATE_BLOCK_REPORT_WARN_THRESHOLD =
+ 10 * 60 * 1000; // 10m
+ private static final long LATE_BLOCK_REPORT_INFO_THRESHOLD =
+ 3 * 60 * 1000; // 3m
+
// For InterDataNodeProtocol
public Server ipcServer;
@@ -713,6 +722,8 @@ public class DataNode extends Configured
namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
}
// random short delay - helps scatter the BR from all DNs
+ // - but we can start generating the block report immediately
+ data.requestAsyncBlockReport();
scheduleBlockReport(initialBlockReportDelay);
}
@@ -937,42 +948,60 @@ public class DataNode extends Configured
// Send latest blockinfo report if timer has expired.
if (startTime - lastBlockReport > blockReportInterval) {
-
- // Create block report
- long brCreateStartTime = now();
- Block[] bReport = data.getBlockReport();
-
- // Send block report
- long brSendStartTime = now();
- DatanodeCommand cmd = namenode.blockReport(dnRegistration,
- BlockListAsLongs.convertToArrayLongs(bReport));
-
- // Log the block report processing stats from Datanode perspective
- long brSendCost = now() - brSendStartTime;
- long brCreateCost = brSendStartTime - brCreateStartTime;
- myMetrics.addBlockReport(brSendCost);
- LOG.info("BlockReport of " + bReport.length
- + " blocks took " + brCreateCost + " msec to generate and "
- + brSendCost + " msecs for RPC and NN processing");
-
- //
- // If we have sent the first block report, then wait a random
- // time before we start the periodic block reports.
- //
- if (resetBlockReportTime) {
- lastBlockReport = startTime -
R.nextInt((int)(blockReportInterval));
- resetBlockReportTime = false;
+ if (data.isAsyncBlockReportReady()) {
+ // Create block report
+ long brCreateStartTime = now();
+ Block[] bReport = data.retrieveAsyncBlockReport();
+
+ // Send block report
+ long brSendStartTime = now();
+ DatanodeCommand cmd = namenode.blockReport(dnRegistration,
+ BlockListAsLongs.convertToArrayLongs(bReport));
+
+ // Log the block report processing stats from Datanode perspective
+ long brSendCost = now() - brSendStartTime;
+ long brCreateCost = brSendStartTime - brCreateStartTime;
+ myMetrics.addBlockReport(brSendCost);
+ LOG.info("BlockReport of " + bReport.length
+ + " blocks took " + brCreateCost + " msec to generate and "
+ + brSendCost + " msecs for RPC and NN processing");
+
+ //
+ // If we have sent the first block report, then wait a random
+ // time before we start the periodic block reports.
+ //
+ if (resetBlockReportTime) {
+ lastBlockReport = startTime -
+ R.nextInt((int)(blockReportInterval));
+ resetBlockReportTime = false;
+ } else {
+ /* say the last block report was at 8:20:14. The current report
+ * should have started around 9:20:14 (default 1 hour interval).
+ * If current time is :
+ * 1) normal like 9:20:18, next report should be at 10:20:14
+ * 2) unexpected like 11:35:43, next report should be at
+ * 12:20:14
+ */
+ lastBlockReport += (now() - lastBlockReport) /
+ blockReportInterval * blockReportInterval;
+ }
+ processCommand(cmd);
} else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (now() - lastBlockReport) /
- blockReportInterval * blockReportInterval;
+ data.requestAsyncBlockReport();
+ if (lastBlockReport > 0) { // this isn't the first report
+ long waitingFor =
+ startTime - lastBlockReport - blockReportInterval;
+ String msg = "Block report is due, and been waiting for it for "
+
+ (waitingFor/1000) + " seconds...";
+ if (waitingFor > LATE_BLOCK_REPORT_WARN_THRESHOLD) {
+ LOG.warn(msg);
+ } else if (waitingFor > LATE_BLOCK_REPORT_INFO_THRESHOLD) {
+ LOG.info(msg);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg);
+ }
+ }
}
- processCommand(cmd);
}
// start block scanner
Modified:
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1240440&r1=1240439&r2=1240440&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Sat Feb 4 02:16:46 2012
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeSet;
import javax.management.NotCompliantMBeanException;
@@ -66,22 +67,23 @@ public class FSDataset implements FSCons
/** Find the metadata file for the specified block file.
* Return the generation stamp from the name of the metafile.
*/
- private static long getGenerationStampFromFile(File[] listdir, File
blockFile) {
- String blockName = blockFile.getName();
+ static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+ String blockNamePrefix = blockFile.getName() + "_";
+ // blockNamePrefix is blk_12345_
+ // path we're looking for looks like = blk_12345_GENSTAMP.meta
+
for (int j = 0; j < listdir.length; j++) {
String path = listdir[j].getName();
- if (!path.startsWith(blockName)) {
- continue;
- }
- String[] vals = path.split("_");
- if (vals.length != 3) { // blk, blkid, genstamp.meta
+ if (!path.startsWith(blockNamePrefix)) {
continue;
}
- String[] str = vals[2].split("\\.");
- if (str.length != 2) {
+ if (!path.endsWith(".meta")) {
continue;
}
- return Long.parseLong(str[0]);
+
+ String metaPart = path.substring(blockNamePrefix.length(),
+ path.length() - METADATA_EXTENSION_LENGTH);
+ return Long.parseLong(metaPart);
}
DataNode.LOG.warn("Block " + blockFile +
" does not have a metafile!");
@@ -214,30 +216,6 @@ public class FSDataset implements FSCons
/**
* Populate the given blockSet with any child blocks
- * found at this node.
- */
- public void getBlockInfo(TreeSet<Block> blockSet) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getBlockInfo(blockSet);
- }
- }
-
- File blockFiles[] = dir.listFiles();
- if (blockFiles != null) {
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = FSDataset.getGenerationStampFromFile(blockFiles,
- blockFiles[i]);
- blockSet.add(new Block(blockFiles[i], blockFiles[i].length(),
- genStamp));
- }
- }
- }
- }
-
- /**
- * Populate the given blockSet with any child blocks
* found at this node. With each block, return the full path
* of the block file.
*/
@@ -525,9 +503,43 @@ public class FSDataset implements FSCons
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(blocksBeingWritten);
}
-
- void getBlockInfo(TreeSet<Block> blockSet) {
- dataDir.getBlockInfo(blockSet);
+
+ void scanBlockFilesInconsistent(Map<Block, File> results) {
+ scanBlockFilesInconsistent(dataDir.dir, results);
+ }
+
+ /**
+ * Recursively scan the given directory, generating a map where
+ * each key is a discovered block, and the value is the actual
+ * file for that block.
+ *
+ * This is unsynchronized since it can take quite some time
+ * when inodes and dentries have been paged out of cache.
+ * After the scan is completed, we reconcile it with
+ * the current disk state in reconcileRoughBlockScan.
+ */
+ private void scanBlockFilesInconsistent(
+ File dir, Map<Block, File> results) {
+ File filesInDir[] = dir.listFiles();
+ if (filesInDir != null) {
+ for (File f : filesInDir) {
+ if (Block.isBlockFilename(f)) {
+ long blockLen = f.length();
+ if (blockLen == 0 && !f.exists()) {
+ // length 0 could indicate a race where this file was removed
+ // while we were in the middle of generating the report.
+ continue;
+ }
+ long genStamp = FSDataset.getGenerationStampFromFile(filesInDir,
f);
+ Block b = new Block(f, blockLen, genStamp);
+ results.put(b, f);
+ } else if (f.getName().startsWith("subdir")) {
+ // the startsWith check is much faster than the
+ // stat() call invoked by isDirectory()
+ scanBlockFilesInconsistent(f, results);
+ }
+ }
+ }
}
void getBlocksBeingWrittenInfo(TreeSet<Block> blockSet) {
@@ -685,13 +697,20 @@ public class FSDataset implements FSCons
}
return remaining;
}
+
+ void scanBlockFilesInconsistent(Map<Block, File> results) {
+ // Make a local consistent copy of the volume list, since
+ // it might change due to a disk failure
+ FSVolume volumesCopy[];
+ synchronized (this) {
+ volumesCopy = Arrays.copyOf(volumes, volumes.length);
+ }
- synchronized void getBlockInfo(TreeSet<Block> blockSet) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getBlockInfo(blockSet);
+ for (FSVolume vol : volumesCopy) {
+ vol.scanBlockFilesInconsistent(results);
}
}
-
+
synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo>
volumeMap) {
for (int idx = 0; idx < volumes.length; idx++) {
volumes[idx].getVolumeMap(volumeMap);
@@ -772,6 +791,8 @@ public class FSDataset implements FSCons
//Find better place?
public static final String METADATA_EXTENSION = ".meta";
+ public static final int METADATA_EXTENSION_LENGTH =
+ METADATA_EXTENSION.length();
public static final short METADATA_VERSION = 1;
@@ -926,6 +947,9 @@ public class FSDataset implements FSCons
private int validVolsRequired;
FSDatasetAsyncDiskService asyncDiskService;
+ private final AsyncBlockReport asyncBlockReport;
+
+
/**
* An FSDataset has a directory where it loads its data files.
*/
@@ -966,6 +990,8 @@ public class FSDataset implements FSCons
}
volumes = new FSVolumeSet(volArray);
volumes.getVolumeMap(volumeMap);
+ asyncBlockReport = new AsyncBlockReport(this);
+ asyncBlockReport.start();
File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
@@ -1656,18 +1682,125 @@ public class FSDataset implements FSCons
return blockTable;
}
+ @Override
+ public void requestAsyncBlockReport() {
+ asyncBlockReport.request();
+ }
+
+ @Override
+ public boolean isAsyncBlockReportReady() {
+ return asyncBlockReport.isReady();
+ }
+
+ @Override
+ public Block[] retrieveAsyncBlockReport() {
+ HashMap<Block, File> seenOnDisk = asyncBlockReport.getAndReset();
+ return reconcileRoughBlockScan(seenOnDisk);
+ }
+
/**
- * Return a table of block data
+ * Return a table of block data. This method is synchronous, and is used
+ * by tests and during block scanner startup.
*/
public Block[] getBlockReport() {
- TreeSet<Block> blockSet = new TreeSet<Block>();
- volumes.getBlockInfo(blockSet);
- Block blockTable[] = new Block[blockSet.size()];
- int i = 0;
- for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
- blockTable[i] = it.next();
+ long st = System.currentTimeMillis();
+ HashMap<Block, File> seenOnDisk = roughBlockScan();
+ // the above results are inconsistent since modifications
+ // happened concurrently. Now check any diffs
+ DataNode.LOG.info("Generated rough (lockless) block report in "
+ + (System.currentTimeMillis() - st) + " ms");
+ return reconcileRoughBlockScan(seenOnDisk);
+ }
+
+ private Block[] reconcileRoughBlockScan(HashMap<Block, File> seenOnDisk) {
+ Set<Block> blockReport;
+ synchronized (this) {
+ long st = System.currentTimeMillis();
+ // broken out to a static method to simplify testing
+ reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ DataNode.LOG.info(
+ "Reconciled asynchronous block report against current state in " +
+ (System.currentTimeMillis() - st) + " ms");
+
+ blockReport = seenOnDisk.keySet();
+ }
+
+ return blockReport.toArray(new Block[0]);
+ }
+
+ /**
+ * Scan the blocks in the dataset on disk, without holding any
+ * locks. This generates a "rough" block report, since there
+ * may be concurrent modifications to the disk structure.
+ */
+ HashMap<Block, File> roughBlockScan() {
+ int expectedNumBlocks;
+ synchronized (this) {
+ expectedNumBlocks = volumeMap.size();
+ }
+ HashMap<Block, File> seenOnDisk =
+ new HashMap<Block, File>(expectedNumBlocks, 1.1f);
+ volumes.scanBlockFilesInconsistent(seenOnDisk);
+ return seenOnDisk;
+ }
+
+ static void reconcileRoughBlockScan(
+ Map<Block, File> seenOnDisk,
+ Map<Block, DatanodeBlockInfo> volumeMap,
+ Map<Block,ActiveFile> ongoingCreates) {
+
+ int numDeletedAfterScan = 0;
+ int numAddedAfterScan = 0;
+ int numOngoingIgnored = 0;
+
+ // remove anything seen on disk that's no longer in the memory map,
+ // or got reopened while we were scanning
+ Iterator<Map.Entry<Block, File>> iter = seenOnDisk.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Block, File> entry = iter.next();
+ Block b = entry.getKey();
+
+ if (!volumeMap.containsKey(b) || ongoingCreates.containsKey(b)) {
+ File blockFile = entry.getValue();
+ File metaFile = getMetaFile(blockFile, b);
+ if (!blockFile.exists() || !metaFile.exists()) {
+ // the block was deleted (or had its generation stamp changed)
+ // after it was scanned on disk... If the genstamp changed,
+ // it will be added below when we scan volumeMap
+ iter.remove();
+ numDeletedAfterScan++;
+ }
+ }
+ }
+
+ // add anything from the in-memory map that wasn't seen on disk,
+ // if and only if the file is now verifiably on disk.
+ for (Map.Entry<Block, DatanodeBlockInfo> entry : volumeMap.entrySet()) {
+ Block b = entry.getKey();
+ if (ongoingCreates.containsKey(b)) {
+ // don't add these to block reports
+ numOngoingIgnored++;
+ continue;
+ }
+ DatanodeBlockInfo info = entry.getValue();
+ if (!seenOnDisk.containsKey(b) && info.getFile().exists()) {
+ // add a copy, and use the length from disk instead of from memory
+ Block toAdd = new Block(
+ b.getBlockId(), info.getFile().length(), b.getGenerationStamp());
+ seenOnDisk.put(toAdd, info.getFile());
+ numAddedAfterScan++;
+ }
+ // if the file is in memory but _not_ on disk, this is the situation
+ // in which an administrator accidentally "rm -rf"ed part of a data
+ // directory. We should _not_ report these blocks.
+ }
+
+ if (numDeletedAfterScan + numAddedAfterScan + numOngoingIgnored > 0) {
+ DataNode.LOG.info("Reconciled asynchronous block scan with filesystem. "
+
+ numDeletedAfterScan + " blocks concurrently deleted during scan, " +
+ numAddedAfterScan + " blocks concurrently added during scan, " +
+ numOngoingIgnored + " ongoing creations ignored");
}
- return blockTable;
}
/**
@@ -1937,6 +2070,10 @@ public class FSDataset implements FSCons
asyncDiskService.shutdown();
}
+ if (asyncBlockReport != null) {
+ asyncBlockReport.shutdown();
+ }
+
if(volumes != null) {
for (FSVolume volume : volumes.volumes) {
if(volume != null) {
@@ -2031,4 +2168,91 @@ public class FSDataset implements FSCons
return info;
}
}
+
+ /**
+ * Thread which handles generating "rough" block reports in the background.
+ * Callers should call request(), and then poll isReady() while the
+ * work happens. When isReady() returns true, getAndReset() may be
+ * called to retrieve the results.
+ */
+ static class AsyncBlockReport implements Runnable {
+ private final Thread thread;
+ private final FSDataset fsd;
+
+ boolean requested = false;
+ boolean shouldRun = true;
+ private HashMap<Block, File> scan = null;
+
+ AsyncBlockReport(FSDataset fsd) {
+ this.fsd = fsd;
+ thread = new Thread(this, "Async Block Report Generator");
+ thread.setDaemon(true);
+ }
+
+ void start() {
+ thread.start();
+ }
+
+ synchronized void shutdown() {
+ shouldRun = false;
+ thread.interrupt();
+ }
+
+ synchronized boolean isReady() {
+ return scan != null;
+ }
+
+ synchronized HashMap<Block, File> getAndReset() {
+ if (!isReady()) {
+ throw new IllegalStateException("report not ready!");
+ }
+ HashMap<Block, File> ret = scan;
+ scan = null;
+ requested = false;
+ return ret;
+ }
+
+ synchronized void request() {
+ requested = true;
+ notifyAll();
+ }
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ try {
+ waitForReportRequest();
+ assert requested && scan == null;
+
+ DataNode.LOG.info("Starting asynchronous block report scan");
+ long st = System.currentTimeMillis();
+ HashMap<Block, File> result = fsd.roughBlockScan();
+ DataNode.LOG.info("Finished asynchronous block report scan in "
+ + (System.currentTimeMillis() - st) + "ms");
+
+ synchronized (this) {
+ assert scan == null;
+ this.scan = result;
+ }
+ } catch (InterruptedException ie) {
+ // interrupted to end scanner
+ } catch (Throwable t) {
+ DataNode.LOG.error("Async Block Report thread caught exception", t);
+ try {
+ // Avoid busy-looping in the case that we have entered some invalid
+ // state -- don't want to flood the error log with exceptions.
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ private synchronized void waitForReportRequest()
+ throws InterruptedException {
+ while (!(requested && scan == null)) {
+ wait(5000);
+ }
+ }
+ }
}
Modified:
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1240440&r1=1240439&r2=1240440&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++
hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Sat Feb 4 02:16:46 2012
@@ -238,6 +238,29 @@ public interface FSDatasetInterface exte
public Block[] getBlockReport();
/**
+ * Request that a block report be prepared.
+ */
+ public void requestAsyncBlockReport();
+
+ /**
+ * @return true if an asynchronous block report is ready
+ */
+ public boolean isAsyncBlockReportReady();
+
+ /**
+ * Retrieve an asynchronously prepared block report. Callers should first
+ * call {@link #requestAsyncBlockReport()}, and then poll
+ * {@link #isAsyncBlockReportReady()} until it returns true.
+ *
+ * Retrieving the asynchronous block report also resets it; a new
+ * one must be prepared before this method may be called again.
+ *
+ * @throws IllegalStateException if an async report is not ready
+ */
+ public Block[] retrieveAsyncBlockReport();
+
+
+ /**
* Returns the blocks being written report
* @return - the blocks being written report
*/
Modified:
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1240440&r1=1240439&r2=1240440&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Sat Feb 4 02:16:46 2012
@@ -308,6 +308,21 @@ public class SimulatedFSDataset impleme
}
return blockTable;
}
+
+ @Override
+ public void requestAsyncBlockReport() {
+ }
+
+ @Override
+ public boolean isAsyncBlockReportReady() {
+ return true;
+ }
+
+ @Override
+ public Block[] retrieveAsyncBlockReport() {
+ return getBlockReport();
+ }
+
public long getCapacity() throws IOException {
return storage.getCapacity();
Added:
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java?rev=1240440&view=auto
==============================================================================
---
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java
(added)
+++
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java
Sat Feb 4 02:16:46 2012
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.ActiveFile;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.AsyncBlockReport;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestBlockReportGeneration {
+ private static final long BLKID = 12345L;
+ private static final long GENSTAMP = 1000L;
+ private static final long LEN = 65536L;
+
+ private static final Block FAKE_BLK =
+ new Block(BLKID, LEN, GENSTAMP);
+
+
+ static final File TEST_DIR = new File(
+ System.getProperty("test.build.data") + File.pathSeparatorChar
+ + "TestBlockReportGeneration");
+
+ Map<Block, File> seenOnDisk = new HashMap<Block, File>();
+ Map<Block, DatanodeBlockInfo> volumeMap =
+ new HashMap<Block, DatanodeBlockInfo>();
+ Map<Block, ActiveFile> ongoingCreates = new HashMap<Block, ActiveFile>();;
+
+
+ @Before
+ public void cleanupTestDir() throws IOException {
+ FileUtil.fullyDelete(TEST_DIR);
+ assertTrue(TEST_DIR.mkdirs());
+ }
+
+ @Test
+ public void testEmpty() {
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ assertTrue(seenOnDisk.isEmpty());
+ }
+
+ /**
+ * Test for case where for some reason there's a block on disk
+ * that got lost in volumemap - we want to report this.
+ */
+ @Test
+ public void testOnDiskButNotMemory() {
+ fakeSeenByScan(FAKE_BLK);
+ fakeBlockOnDisk(FAKE_BLK);
+
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // should still be in map, since it was seen to still exist on disk
+ // (exists returns true)
+ assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+ }
+
+ /**
+ * Test for case where we lost a block from disk - eg a user rm -Rfed
+ * a data dir accidentally.
+ */
+ @Test
+ public void testInMemoryButNotOnDisk() {
+ fakeInVolumeMap(FAKE_BLK);
+
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ assertTrue(volumeMap.containsKey(FAKE_BLK));
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // should not be added to the map, since it's truly not on disk
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ }
+
+ /**
+ * Test for case where we concurrently removed a block, so it is
+ * seen in scan, but during reconciliation is on longer on disk.
+ */
+ @Test
+ public void testRemovedAfterScan() {
+ fakeSeenByScan(FAKE_BLK);
+
+ assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+ assertFalse(volumeMap.containsKey(FAKE_BLK));
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // should be removed from the map since .exists() returns false
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ }
+
+ /**
+ * Test for case where we concurrently added a block, so it is
+ * not seen in scan, but is in volumeMap and on disk during
+ * reconciliation.
+ */
+ @Test
+ public void testAddedAfterScan() {
+ fakeInVolumeMap(FAKE_BLK);
+ fakeBlockOnDisk(FAKE_BLK);
+
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ assertTrue(volumeMap.containsKey(FAKE_BLK));
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // should be added, since it's found on disk when reconciling
+ assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+ }
+
+ /**
+ * Test for case where we concurrently changed the generation stamp
+ * of a block. So, we scanned it with one GS, but at reconciliation
+ * time it has a new GS.
+ */
+ @Test
+ public void testGenstampChangedAfterScan() {
+ Block oldGenStamp = FAKE_BLK;
+ Block newGenStamp = new Block(FAKE_BLK);
+ newGenStamp.setGenerationStamp(GENSTAMP + 1);
+
+ fakeSeenByScan(oldGenStamp);
+ fakeInVolumeMap(newGenStamp);
+ fakeBlockOnDisk(newGenStamp);
+
+ assertTrue(seenOnDisk.containsKey(oldGenStamp));
+
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // old genstamp should not be added
+ assertFalse(seenOnDisk.containsKey(oldGenStamp));
+ // new genstamp should be added, since it's found on disk when reconciling
+ assertTrue(seenOnDisk.containsKey(newGenStamp));
+ }
+
+ @Test
+ public void testGetGenerationStampFromFile() {
+ File[] fileList = new File[] {
+ // include some junk files which should be ignored
+ new File("blk_-1362850638739812068_5351.meta.foo"),
+ new File("blk_-1362850638739812068_5351meta"),
+ // the real dir
+ new File("."),
+ new File(".."),
+ new File("blk_-1362850638739812068"),
+ new File("blk_-1362850638739812068_5351.meta"),
+ new File("blk_1453973893701037484"),
+ new File("blk_1453973893701037484_4804.meta"),
+ };
+
+ assertEquals(4804, FSDataset.getGenerationStampFromFile(fileList,
+ new File("blk_1453973893701037484")));
+ // try a prefix of a good block ID
+ assertEquals(Block.GRANDFATHER_GENERATION_STAMP,
+ FSDataset.getGenerationStampFromFile(fileList,
+ new File("blk_145397389370103")));
+
+ assertEquals(Block.GRANDFATHER_GENERATION_STAMP,
+ FSDataset.getGenerationStampFromFile(fileList,
+ new File("blk_99999")));
+
+ // pass nonsense value
+ assertEquals(Block.GRANDFATHER_GENERATION_STAMP,
+ FSDataset.getGenerationStampFromFile(fileList,
+ new File("blk_")));
+ }
+
+
+ /**
+ * Test case for blocks being created - these are not seen by the
+ * scan since they're in the current/ dir, not bbw/ -- but
+ * they are in volumeMap and ongoingCreates. These should not
+ * be reported.
+ */
+ @Test
+ public void testFileBeingCreated() {
+ fakeInVolumeMap(FAKE_BLK);
+ fakeBlockOnDisk(FAKE_BLK);
+ fakeBeingCreated(FAKE_BLK);
+
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ assertTrue(volumeMap.containsKey(FAKE_BLK));
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // should not be added, since it's in the midst of being created!
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ }
+
+ /**
+ * Test for case where we reopened a block during the scan
+ */
+ @Test
+ public void testReopenedDuringScan() {
+ fakeSeenByScan(FAKE_BLK);
+ fakeInVolumeMap(FAKE_BLK);
+ fakeBeingCreated(FAKE_BLK);
+
+ assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+ assertTrue(volumeMap.containsKey(FAKE_BLK));
+ FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+ // should be removed from the map since .exists() returns false
+ assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+ }
+
+ @Test(timeout=20000)
+ public void testAsyncReport() throws Exception {
+ FSDataset mock = Mockito.mock(FSDataset.class);
+ AsyncBlockReport abr = new FSDataset.AsyncBlockReport(mock);
+ abr.start();
+ try {
+ for (int i = 0; i < 3; i++) {
+ HashMap<Block, File> mockResult = new HashMap<Block, File>();
+ Mockito.doReturn(mockResult).when(mock).roughBlockScan();
+
+ assertFalse(abr.isReady());
+ abr.request();
+ while (!abr.isReady()) {
+ Thread.sleep(10);
+ }
+ assertSame(mockResult, abr.getAndReset());
+ assertFalse(abr.isReady());
+ }
+ } finally {
+ abr.shutdown();
+ }
+ }
+
+ private void fakeBeingCreated(Block b) {
+ ongoingCreates.put(b,
+ new ActiveFile(blockFile(b), new ArrayList<Thread>()));
+ }
+
+ private void fakeInVolumeMap(Block b) {
+ volumeMap.put(b, new DatanodeBlockInfo(null, blockFile(b)));
+ }
+
+ private void fakeBlockOnDisk(Block b) {
+ File f = blockFile(b);
+ try {
+ f.createNewFile();
+ FSDataset.getMetaFile(f, b).createNewFile();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create: " + f);
+ }
+ }
+
+ private void fakeSeenByScan(Block b) {
+ seenOnDisk.put(b, blockFile(b));
+ }
+
+ private File blockFile(Block b) {
+ return new File(TEST_DIR, b.getBlockName());
+ }
+}