Author: rangadi
Date: Fri Apr 10 20:14:14 2009
New Revision: 764031
URL: http://svn.apache.org/viewvc?rev=764031&view=rev
Log:
HADOOP-4584. Improve datanode block reports and associated file system
scan to avoid interefering with normal datanode operations.
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/hdfs-default.xml
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr 10 20:14:14 2009
@@ -212,6 +212,9 @@
HADOOP-2413. Remove the static variable FSNamesystem.fsNamesystemObject.
(Konstantin Shvachko via szetszwo)
+ HADOOP-4584. Improve datanode block reports and associated file system
+ scan to avoid interefering with normal datanode operations.
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/hdfs/hdfs-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/hdfs-default.xml?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/hdfs-default.xml (original)
+++ hadoop/core/trunk/src/hdfs/hdfs-default.xml Fri Apr 10 20:14:14 2009
@@ -271,6 +271,14 @@
</property>
<property>
+ <name>dfs.datanode.directoryscan.interval</name>
+ <value>21600</value>
+ <description>Interval in seconds for Datanode to scan data directories and
+ reconcile the difference between blocks in memory and on the disk.
+ </description>
+</property>
+
+<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>
<description>Determines datanode heartbeat interval in seconds.</description>
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java Fri
Apr 10 20:14:14 2009
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.io.*;
@@ -28,7 +30,8 @@
*
**************************************************/
public class Block implements Writable, Comparable<Block> {
-
+ public static final String BLOCK_FILE_PREFIX = "blk_";
+ public static final String METADATA_EXTENSION = ".meta";
static { // register a ctor
WritableFactories.setFactory
(Block.class,
@@ -41,20 +44,41 @@
// a generation stamp.
public static final long GRANDFATHER_GENERATION_STAMP = 0;
- /**
- */
+ public static final Pattern blockFilePattern = Pattern
+ .compile(BLOCK_FILE_PREFIX + "(-??\\d++)$");
+ public static final Pattern metaFilePattern = Pattern
+ .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
+ + "$");
+
public static boolean isBlockFilename(File f) {
String name = f.getName();
- if ( name.startsWith( "blk_" ) &&
- name.indexOf( '.' ) < 0 ) {
- return true;
- } else {
- return false;
- }
+ return blockFilePattern.matcher(name).matches();
+ }
+
+ public static long filename2id(String name) {
+ Matcher m = blockFilePattern.matcher(name);
+ return m.matches() ? Long.parseLong(m.group(1)) : 0;
}
- static long filename2id(String name) {
- return Long.parseLong(name.substring("blk_".length()));
+ public static boolean isMetaFilename(String name) {
+ return metaFilePattern.matcher(name).matches();
+ }
+
+ /**
+ * Get generation stamp from the name of the metafile name
+ */
+ public static long getGenerationStamp(String metaFile) {
+ Matcher m = metaFilePattern.matcher(metaFile);
+ return m.matches() ? Long.parseLong(m.group(2))
+ : GRANDFATHER_GENERATION_STAMP;
+ }
+
+ /**
+ * Get the blockId from the name of the metafile name
+ */
+ public static long getBlockId(String metaFile) {
+ Matcher m = metaFilePattern.matcher(metaFile);
+ return m.matches() ? Long.parseLong(m.group(1)) : 0;
}
private long blockId;
@@ -96,7 +120,7 @@
/**
*/
public String getBlockName() {
- return "blk_" + String.valueOf(blockId);
+ return BLOCK_FILE_PREFIX + String.valueOf(blockId);
}
/**
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Fri Apr 10 20:14:14 2009
@@ -52,7 +52,13 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
-/*
+/**
+ * Performs two types of scanning:
+ * <li> Gets block files from the data directories and reconciles the
+ * difference between the blocks on the disk and in memory in
+ * {...@link FSDataset}</li>
+ * <li> Scans the data directories for block files and verifies that
+ * the files are not corrupt</li>
* This keeps track of blocks and their last verification times.
* Currently it does not modify the metadata for block.
*/
@@ -96,6 +102,9 @@
BlockTransferThrottler throttler = null;
+ // Reconciles blocks on disk to blocks in memory
+ DirectoryScanner dirScanner;
+
private static enum ScanType {
REMOTE_READ, // Verified when a block read by a client etc
VERIFICATION_SCAN, // scanned as part of periodic verfication
@@ -143,6 +152,8 @@
}
scanPeriod *= 3600 * 1000;
// initialized when the scanner thread is started.
+
+ dirScanner = new DirectoryScanner(dataset, conf);
}
private synchronized boolean isInitiliazed() {
@@ -586,6 +597,10 @@
startNewPeriod();
}
}
+ if (dirScanner.newScanPeriod(now)) {
+ dirScanner.reconcile();
+ now = System.currentTimeMillis();
+ }
if ( (now - getEarliestScanTime()) >= scanPeriod ) {
verifyFirstBlock();
} else {
@@ -940,7 +955,6 @@
}
public static class Servlet extends HttpServlet {
-
public void doGet(HttpServletRequest request,
HttpServletResponse response) throws IOException {
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Fri Apr 10 20:14:14 2009
@@ -699,7 +699,6 @@
//
// Now loop for a long time....
//
-
while (shouldRun) {
try {
long startTime = now();
@@ -729,73 +728,10 @@
continue;
}
- // check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints) {
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of the
same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
- }
- }
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length
) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- namenode.blockReceived(dnRegistration, blockArray, delHintArray);
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
- }
- }
- }
+ reportReceivedBlocks();
- // send block report
- if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest blockinfo report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
- Block[] bReport = data.getBlockReport();
- DatanodeCommand cmd = namenode.blockReport(dnRegistration,
- BlockListAsLongs.convertToArrayLongs(bReport));
- long brTime = now() - brStartTime;
- myMetrics.blockReports.inc(brTime);
- LOG.info("BlockReport of " + bReport.length +
- " blocks got processed in " + brTime + " msecs");
- //
- // If we have sent the first block report, then wait a random
- // time before we start the periodic block reports.
- //
- if (resetBlockReportTime) {
- lastBlockReport = startTime -
R.nextInt((int)(blockReportInterval));
- resetBlockReportTime = false;
- } else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (now() - lastBlockReport) /
- blockReportInterval * blockReportInterval;
- }
- processCommand(cmd);
- }
+ DatanodeCommand cmd = blockReport();
+ processCommand(cmd);
// start block scanner
if (blockScanner != null && blockScannerThread == null &&
@@ -926,6 +862,88 @@
upgradeManager.processUpgradeCommand(comm);
}
+ /**
+ * Report received blocks and delete hints to the Namenode
+ * @throws IOException
+ */
+ private void reportReceivedBlocks() throws IOException {
+ //check if there are newly received blocks
+ Block [] blockArray=null;
+ String [] delHintArray=null;
+ synchronized(receivedBlockList) {
+ synchronized(delHints){
+ int numBlocks = receivedBlockList.size();
+ if (numBlocks > 0) {
+ if(numBlocks!=delHints.size()) {
+ LOG.warn("Panic: receiveBlockList and delHints are not of the same
length" );
+ }
+ //
+ // Send newly-received blockids to namenode
+ //
+ blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+ delHintArray = delHints.toArray(new String[numBlocks]);
+ }
+ }
+ }
+ if (blockArray != null) {
+ if(delHintArray == null || delHintArray.length != blockArray.length ) {
+ LOG.warn("Panic: block array & delHintArray are not the same" );
+ }
+ namenode.blockReceived(dnRegistration, blockArray, delHintArray);
+ synchronized(receivedBlockList) {
+ synchronized(delHints){
+ for(int i=0; i<blockArray.length; i++) {
+ receivedBlockList.remove(blockArray[i]);
+ delHints.remove(delHintArray[i]);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Report the list blocks to the Namenode
+ * @throws IOException
+ */
+ private DatanodeCommand blockReport() throws IOException {
+ // send block report
+ DatanodeCommand cmd = null;
+ long startTime = now();
+ if (startTime - lastBlockReport > blockReportInterval) {
+ //
+ // Send latest block report if timer has expired.
+ // Get back a list of local block(s) that are obsolete
+ // and can be safely GC'ed.
+ //
+ long brStartTime = now();
+ Block[] bReport = data.getBlockReport();
+
+ cmd = namenode.blockReport(dnRegistration,
+ BlockListAsLongs.convertToArrayLongs(bReport));
+ long brTime = now() - brStartTime;
+ myMetrics.blockReports.inc(brTime);
+ LOG.info("BlockReport of " + bReport.length +
+ " blocks got processed in " + brTime + " msecs");
+ //
+ // If we have sent the first block report, then wait a random
+ // time before we start the periodic block reports.
+ //
+ if (resetBlockReportTime) {
+ lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
+ resetBlockReportTime = false;
+ } else {
+ /* say the last block report was at 8:20:14. The current report
+ * should have started around 9:20:14 (default 1 hour interval).
+ * If current time is :
+ * 1) normal like 9:20:18, next report should be at 10:20:14
+ * 2) unexpected like 11:35:43, next report should be at 12:20:14
+ */
+ lastBlockReport += (now() - lastBlockReport) /
+ blockReportInterval * blockReportInterval;
+ }
+ }
+ return cmd;
+ }
/**
* Start distributed upgrade if it should be initiated by the data-node.
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
Fri Apr 10 20:14:14 2009
@@ -58,6 +58,10 @@
return file;
}
+ void setFile(File f) {
+ file = f;
+ }
+
/**
* Is this block already detached?
*/
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=764031&view=auto
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
(added)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
Fri Apr 10 20:14:14 2009
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * Periodically scans the data directories for block and block metadata files.
+ * Reconciles the differences with block information maintained in
+ * {...@link FSDataset}
+ */
+public class DirectoryScanner {
+ private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
+ private static final int DEFAULT_SCAN_INTERVAL = 21600;
+
+ private final FSDataset dataset;
+ private long scanPeriod;
+ private long lastScanTime;
+
+ LinkedList<ScanInfo> diff = new LinkedList<ScanInfo>();
+
+ /** Stats tracked for reporting and testing */
+ long totalBlocks;
+ long missingMetaFile;
+ long missingBlockFile;
+ long missingMemoryBlocks;
+ long mismatchBlocks;
+
+ /**
+ * Tracks the files and other information related to a block on the disk
+ * Missing file is indicated by setting the corresponding member
+ * to null.
+ */
+ static class ScanInfo implements Comparable<ScanInfo> {
+ private final long blockId;
+ private final File metaFile;
+ private final File blockFile;
+ private final FSVolume volume;
+
+ ScanInfo(long blockId) {
+ this(blockId, null, null, null);
+ }
+
+ ScanInfo(long blockId, File blockFile, File metaFile, FSVolume vol) {
+ this.blockId = blockId;
+ this.metaFile = metaFile;
+ this.blockFile = blockFile;
+ this.volume = vol;
+ }
+
+ File getMetaFile() {
+ return metaFile;
+ }
+
+ File getBlockFile() {
+ return blockFile;
+ }
+
+ long getBlockId() {
+ return blockId;
+ }
+
+ FSVolume getVolume() {
+ return volume;
+ }
+
+ @Override // Comparable
+ public int compareTo(ScanInfo b) {
+ if (blockId < b.blockId) {
+ return -1;
+ } else if (blockId == b.blockId) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ScanInfo)) {
+ return false;
+ }
+ return blockId == ((ScanInfo) o).blockId;
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return 37 * 17 + (int) (blockId^(blockId>>>32));
+ }
+
+ public long getGenStamp() {
+ return metaFile != null ? Block.getGenerationStamp(metaFile.getName()) :
+ Block.GRANDFATHER_GENERATION_STAMP;
+ }
+ }
+
+ DirectoryScanner(FSDataset dataset, Configuration conf) {
+ this.dataset = dataset;
+ int interval = conf.getInt("dfs.datanode.directoryscan.interval",
+ DEFAULT_SCAN_INTERVAL);
+ scanPeriod = interval * 1000L;
+
+ Random rand = new Random();
+ lastScanTime = System.currentTimeMillis() - (rand.nextInt(interval) *
1000L);
+ LOG.info("scan starts at " + (lastScanTime + scanPeriod)
+ + " with interval " + scanPeriod);
+ }
+
+ boolean newScanPeriod(long now) {
+ return now > lastScanTime + scanPeriod;
+ }
+
+ private void clear() {
+ diff.clear();
+ totalBlocks = 0;
+ missingMetaFile = 0;
+ missingBlockFile = 0;
+ missingMemoryBlocks = 0;
+ mismatchBlocks = 0;
+ }
+
+ /**
+ * Reconcile differences between disk and in-memory blocks
+ */
+ void reconcile() {
+ scan();
+ for (ScanInfo info : diff) {
+ dataset.checkAndUpdate(info.getBlockId(), info.getBlockFile(), info
+ .getMetaFile(), info.getVolume());
+ }
+ }
+
+ /**
+ * Scan for the differences between disk and in-memory blocks
+ */
+ void scan() {
+ clear();
+ ScanInfo[] diskReport = getDiskReport();
+ totalBlocks = diskReport.length;
+
+ // Hold FSDataset lock to prevent further changes to the block map
+ synchronized(dataset) {
+ Block[] memReport = dataset.getBlockList(false);
+ Arrays.sort(memReport); // Sort based on blockId
+
+ int d = 0; // index for diskReport
+ int m = 0; // index for memReprot
+ while (m < memReport.length && d < diskReport.length) {
+ Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+ ScanInfo info = diskReport[Math.min(d, diskReport.length - 1)];
+ if (info.getBlockId() < memBlock.getBlockId()) {
+ // Block is missing in memory
+ missingMemoryBlocks++;
+ addDifference(info);
+ d++;
+ continue;
+ }
+ if (info.getBlockId() > memBlock.getBlockId()) {
+ // Block is missing on the disk
+ addDifference(memBlock.getBlockId());
+ m++;
+ continue;
+ }
+ // Block file and/or metadata file exists on the disk
+ // Block exists in memory
+ if (info.getBlockFile() == null) {
+ // Block metadata file exits and block file is missing
+ addDifference(info);
+ } else if (info.getGenStamp() != memBlock.getGenerationStamp()
+ || info.getBlockFile().length() != memBlock.getNumBytes()) {
+ mismatchBlocks++;
+ addDifference(info);
+ }
+ d++;
+ m++;
+ }
+ while (m < memReport.length) {
+ addDifference(memReport[m++].getBlockId());
+ }
+ while (d < diskReport.length) {
+ missingMemoryBlocks++;
+ addDifference(diskReport[d++]);
+ }
+ }
+ LOG.info("Total blocks: " + totalBlocks + ", missing metadata files:"
+ + missingMetaFile + ", missing block files:" + missingBlockFile
+ + ", missing blocks in memory:" + missingMemoryBlocks
+ + ", mismatched blocks:" + mismatchBlocks);
+ lastScanTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Block is found on the disk. In-memory block is missing or does not match
+ * the block on the disk
+ */
+ private void addDifference(ScanInfo info) {
+ missingMetaFile += info.getMetaFile() == null ? 1 : 0;
+ missingBlockFile += info.getBlockFile() == null ? 1 : 0;
+ diff.add(info);
+ }
+
+ /** Block is not found on the disk */
+ private void addDifference(long blockId) {
+ missingBlockFile++;
+ missingMetaFile++;
+ diff.add(new ScanInfo(blockId));
+ }
+
+ /** Get list of blocks on the disk sorted by blockId */
+ private ScanInfo[] getDiskReport() {
+ // First get list of data directories
+ FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
+ ArrayList<LinkedList<ScanInfo>> dirReports =
+ new ArrayList<LinkedList<ScanInfo>>(volumes.length);
+ for (int i = 0; i < volumes.length; i++) {
+ if (!dataset.volumes.isValid(volumes[i])) { // volume is still valid
+ dirReports.add(i, null);
+ } else {
+ LinkedList<ScanInfo> dirReport = new LinkedList<ScanInfo>();
+ dirReports.add(i, compileReport(volumes[i], volumes[i].getDir(),
+ dirReport));
+ }
+ }
+
+ // Compile consolidated report for all the volumes
+ LinkedList<ScanInfo> list = new LinkedList<ScanInfo>();
+ for (int i = 0; i < volumes.length; i++) {
+ if (dataset.volumes.isValid(volumes[i])) { // volume is still valid
+ list.addAll(dirReports.get(i));
+ }
+ }
+
+ ScanInfo[] report = list.toArray(new ScanInfo[list.size()]);
+ // Sort the report based on blockId
+ Arrays.sort(report);
+ return report;
+ }
+
+ private static boolean isBlockMetaFile(String blockId, String metaFile) {
+ return metaFile.startsWith(blockId)
+ && metaFile.endsWith(Block.METADATA_EXTENSION);
+ }
+
+ /** Compile list {...@link ScanInfo} for the blocks in the directory <dir>*/
+ private LinkedList<ScanInfo> compileReport(FSVolume vol, File dir,
+ LinkedList<ScanInfo> report) {
+ File[] files = dir.listFiles();
+ Arrays.sort(files);
+
+ /* Assumption: In the sorted list of files block file appears immediately
+ * before block metadata file. This is true for the current naming
+ * convention for block file blk_<blockid> and meta file
+ * blk_<blockid>_<genstamp>.meta
+ */
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].isDirectory()) {
+ compileReport(vol, files[i], report);
+ continue;
+ }
+ if (!Block.isBlockFilename(files[i])) {
+ if (isBlockMetaFile("blk_", files[i].getName())) {
+ long blockId = Block.getBlockId(files[i].getName());
+ report.add(new ScanInfo(blockId, null, files[i], vol));
+ }
+ continue;
+ }
+ File blockFile = files[i];
+ long blockId = Block.filename2id(blockFile.getName());
+ File metaFile = null;
+
+ // Skip all the files that start with block name until
+ // getting to the metafile for the block
+ while (i + 1 < files.length
+ && files[i+1].isFile()
+ && files[i + 1].getName().startsWith(blockFile.getName())) {
+ i++;
+ if (isBlockMetaFile(blockFile.getName(), files[i].getName())) {
+ metaFile = files[i];
+ break;
+ }
+ }
+ report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
+ }
+ return report;
+ }
+}
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Fri Apr 10 20:14:14 2009
@@ -26,10 +26,13 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.conf.*;
@@ -158,41 +161,16 @@
if (!path.startsWith(blockName)) {
continue;
}
- String[] vals = path.split("_");
- if (vals.length != 3) { // blk, blkid, genstamp.meta
+ if (blockFile == listdir[j]) {
continue;
}
- String[] str = vals[2].split("\\.");
- if (str.length != 2) {
- continue;
- }
- return Long.parseLong(str[0]);
+ return Block.getGenerationStamp(listdir[j].getName());
}
DataNode.LOG.warn("Block " + blockFile +
" does not have a metafile!");
return Block.GRANDFATHER_GENERATION_STAMP;
}
- /**
- * Populate the given blockSet with any child blocks
- * found at this node.
- */
- public void getBlockInfo(TreeSet<Block> blockSet) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getBlockInfo(blockSet);
- }
- }
-
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = getGenerationStampFromFile(blockFiles,
blockFiles[i]);
- blockSet.add(new Block(blockFiles[i], blockFiles[i].length(),
genStamp));
- }
- }
- }
-
void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume
volume) {
if (children != null) {
for (int i = 0; i < children.length; i++) {
@@ -425,10 +403,6 @@
DiskChecker.checkDir(tmpDir);
}
- void getBlockInfo(TreeSet<Block> blockSet) {
- dataDir.getBlockInfo(blockSet);
- }
-
void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
dataDir.getVolumeMap(volumeMap, this);
}
@@ -506,7 +480,7 @@
return dfsUsed;
}
- synchronized long getCapacity() throws IOException {
+ long getCapacity() throws IOException {
long capacity = 0L;
for (int idx = 0; idx < volumes.length; idx++) {
capacity += volumes[idx].getCapacity();
@@ -514,7 +488,7 @@
return capacity;
}
- synchronized long getRemaining() throws IOException {
+ long getRemaining() throws IOException {
long remaining = 0L;
for (int idx = 0; idx < volumes.length; idx++) {
remaining += volumes[idx].getAvailable();
@@ -522,12 +496,6 @@
return remaining;
}
- synchronized void getBlockInfo(TreeSet<Block> blockSet) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getBlockInfo(blockSet);
- }
- }
-
synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo>
volumeMap) {
for (int idx = 0; idx < volumes.length; idx++) {
volumes[idx].getVolumeMap(volumeMap);
@@ -548,6 +516,15 @@
}
return sb.toString();
}
+
+ public boolean isValid(FSVolume volume) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ if (volumes[idx] == volume) {
+ return true;
+ }
+ }
+ return false;
+ }
}
//////////////////////////////////////////////////////
@@ -676,9 +653,12 @@
FSVolumeSet volumes;
private HashMap<Block,ActiveFile> ongoingCreates = new
HashMap<Block,ActiveFile>();
private int maxBlocksPerDir = 0;
- private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
+ HashMap<Block,DatanodeBlockInfo> volumeMap = null;
static Random random = new Random();
-
+
+ // Used for synchronizing access to usage stats
+ private Object statsLock = new Object();
+
/**
* An FSDataset has a directory where it loads its data files.
*/
@@ -698,21 +678,27 @@
* Return the total space used by dfs datanode
*/
public long getDfsUsed() throws IOException {
- return volumes.getDfsUsed();
+ synchronized(statsLock) {
+ return volumes.getDfsUsed();
+ }
}
/**
* Return total capacity, used and unused
*/
public long getCapacity() throws IOException {
- return volumes.getCapacity();
+ synchronized(statsLock) {
+ return volumes.getCapacity();
+ }
}
/**
* Return how many bytes can still be stored in the FSDataset
*/
public long getRemaining() throws IOException {
- return volumes.getRemaining();
+ synchronized(statsLock) {
+ return volumes.getRemaining();
+ }
}
/**
@@ -1207,19 +1193,36 @@
}
return true;
}
-
+
/**
- * Return a table of block data
+ * Return finalized blocks from the in-memory blockmap
*/
public Block[] getBlockReport() {
- TreeSet<Block> blockSet = new TreeSet<Block>();
- volumes.getBlockInfo(blockSet);
- Block blockTable[] = new Block[blockSet.size()];
- int i = 0;
- for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
- blockTable[i] = it.next();
+ ArrayList<Block> list = new ArrayList<Block>(volumeMap.size());
+ synchronized(this) {
+ for (Block b : volumeMap.keySet()) {
+ if (!ongoingCreates.containsKey(b)) {
+ list.add(new Block(b));
+ }
+ }
+ }
+ return list.toArray(new Block[list.size()]);
+ }
+
+ /**
+ * Get the block list from in-memory blockmap. Note if <deepcopy>
+ * is false, reference to the block in the volumeMap is returned. This block
+ * should not be changed. Suitable synchronization using {...@link FSDataset}
+ * is needed to handle concurrent modification to the block.
+ */
+ synchronized Block[] getBlockList(boolean deepcopy) {
+ Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+ if (deepcopy) {
+ for (int i = 0; i < list.length; i++) {
+ list[i] = new Block(list[i]);
+ }
}
- return blockTable;
+ return list;
}
/**
@@ -1430,4 +1433,190 @@
public String getStorageInfo() {
return toString();
}
+
+ /**
+ * Reconcile the difference between blocks on the disk and blocks in
+ * volumeMap
+ *
+ * Check the given block for inconsistencies. Look at the
+ * current state of the block and reconcile the differences as follows:
+ * <ul>
+ * <li>If the block file is missing, delete the block from volumeMap</li>
+ * <li>If the block file exists and the block is missing in volumeMap,
+ * add the block to volumeMap <li>
+ * <li>If generation stamp does not match, then update the block with right
+ * generation stamp</li>
+ * <li>If the block length in memory does not match the actual block file
length
+ * then mark the block as corrupt and update the block length in memory</li>
+ * <li>If the file in {...@link DatanodeBlockInfo} does not match the file on
+ * the disk, update {...@link DatanodeBlockInfo} with the correct file</li>
+ * </ul>
+ *
+ * @param blockId Block that differs
+ * @param diskFile Block file on the disk
+ * @param diskMetaFile Metadata file from on the disk
+ * @param vol Volume of the block file
+ */
+ public void checkAndUpdate(long blockId, File diskFile,
+ File diskMetaFile, FSVolume vol) {
+ Block block = new Block(blockId);
+ DataNode datanode = DataNode.getDataNode();
+ Block corruptBlock = null;
+ synchronized (this) {
+ if (ongoingCreates.get(block) != null) {
+ // Block is not finalized - ignore the difference
+ return;
+ }
+
+ final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
+ Block.getGenerationStamp(diskMetaFile.getName()) :
+ Block.GRANDFATHER_GENERATION_STAMP;
+
+ DatanodeBlockInfo memBlockInfo = volumeMap.get(block);
+ if (diskFile == null || !diskFile.exists()) {
+ if (memBlockInfo == null) {
+ // Block file does not exist and block does not exist in memory
+ // If metadata file exists then delete it
+ if (diskMetaFile != null && diskMetaFile.exists()
+ && diskMetaFile.delete()) {
+ DataNode.LOG.warn("Deleted a metadata file without a block "
+ + diskMetaFile.getAbsolutePath());
+ }
+ return;
+ }
+ if (!memBlockInfo.getFile().exists()) {
+ // Block is in memory and not on the disk
+ // Remove the block from volumeMap
+ volumeMap.remove(block);
+ if (datanode.blockScanner != null) {
+ datanode.blockScanner.deleteBlock(block);
+ }
+ DataNode.LOG.warn("Removed block " + block.getBlockId()
+ + " from memory with missing block file on the disk");
+ // Finally remove the metadata file
+ if (diskMetaFile != null && diskMetaFile.exists()
+ && diskMetaFile.delete()) {
+ DataNode.LOG.warn("Deleted a metadata file for the deleted block "
+ + diskMetaFile.getAbsolutePath());
+ }
+ }
+ return;
+ }
+ /*
+ * Block file exists on the disk
+ */
+ if (memBlockInfo == null) {
+ // Block is missing in memory - add the block to volumeMap
+ DatanodeBlockInfo diskBlockInfo = new DatanodeBlockInfo(vol, diskFile);
+ Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
+ volumeMap.put(diskBlock, diskBlockInfo);
+ if (datanode.blockScanner != null) {
+ datanode.blockScanner.addBlock(diskBlock);
+ }
+ DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+ return;
+ }
+ /*
+ * Block exists in volumeMap and the block file exists on the disk
+ */
+ // Iterate to get key from volumeMap for the blockId
+ Block memBlock = getBlockKey(blockId);
+
+ // Compare block files
+ File memFile = memBlockInfo.getFile();
+ if (memFile.exists()) {
+ if (memFile.compareTo(diskFile) != 0) {
+ DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
+ + " does not match file found by scan "
+ + diskFile.getAbsolutePath());
+ // TODO: Should the diskFile be deleted?
+ }
+ } else {
+ // Block refers to a block file that does not exist.
+ // Update the block with the file found on the disk. Since the block
+ // file and metadata file are found as a pair on the disk, update
+ // the block based on the metadata file found on the disk
+ DataNode.LOG.warn("Block file in volumeMap "
+ + memFile.getAbsolutePath()
+ + " does not exist. Updating it to the file found during scan "
+ + diskFile.getAbsolutePath());
+ DatanodeBlockInfo info = volumeMap.remove(memBlock);
+ info.setFile(diskFile);
+ memFile = diskFile;
+
+ DataNode.LOG.warn("Updating generation stamp for block " + blockId
+ + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
+ memBlock.setGenerationStamp(diskGS);
+ volumeMap.put(memBlock, info);
+ }
+
+ // Compare generation stamp
+ if (memBlock.getGenerationStamp() != diskGS) {
+ File memMetaFile = getMetaFile(diskFile, memBlock);
+ if (memMetaFile.exists()) {
+ if (memMetaFile.compareTo(diskMetaFile) != 0) {
+ DataNode.LOG.warn("Metadata file in memory "
+ + memMetaFile.getAbsolutePath()
+ + " does not match file found by scan "
+ + diskMetaFile.getAbsolutePath());
+ }
+ } else {
+ // Metadata file corresponding to block in memory is missing
+ // If metadata file found during the scan is on the same directory
+ // as the block file, then use the generation stamp from it
+ long gs = diskMetaFile != null && diskMetaFile.exists()
+ && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
+ : Block.GRANDFATHER_GENERATION_STAMP;
+
+ DataNode.LOG.warn("Updating generation stamp for block " + blockId
+ + " from " + memBlock.getGenerationStamp() + " to " + gs);
+
+ DatanodeBlockInfo info = volumeMap.remove(memBlock);
+ memBlock.setGenerationStamp(gs);
+ volumeMap.put(memBlock, info);
+ }
+ }
+
+ // Compare block size
+ if (memBlock.getNumBytes() != memFile.length()) {
+ // Update the length based on the block file
+ corruptBlock = new Block(memBlock);
+ DataNode.LOG.warn("Updating size of block " + blockId + " from "
+ + memBlock.getNumBytes() + " to " + memFile.length());
+ DatanodeBlockInfo info = volumeMap.remove(memBlock);
+ memBlock.setNumBytes(memFile.length());
+ volumeMap.put(memBlock, info);
+ }
+ }
+
+ // Send corrupt block report outside the lock
+ if (corruptBlock != null) {
+ DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
+ LocatedBlock[] blocks = { new LocatedBlock(corruptBlock, dnArr) };
+ try {
+ datanode.namenode.reportBadBlocks(blocks);
+ DataNode.LOG.warn("Reporting the block " + corruptBlock
+ + " as corrupt due to length mismatch");
+ } catch (IOException e) {
+ DataNode.LOG.warn("Failed to repot bad block " + corruptBlock
+ + "Exception:" + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ /**
+ * Get reference to the key in the volumeMap. To be called from methods that
+ * are synchronized on {...@link FSDataset}
+ * @param blockId
+ * @return key from the volumeMap
+ */
+ Block getBlockKey(long blockId) {
+ assert(Thread.holdsLock(this));
+ for (Block b : volumeMap.keySet()) {
+ if (b.getBlockId() == blockId) {
+ return b;
+ }
+ }
+ return null;
+ }
}
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=764031&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Fri Apr 10 20:14:14 2009
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests {...@link DirectoryScanner} handling of differences
+ * between blocks on the disk and block in memory.
+ */
+public class TestDirectoryScanner extends TestCase {
+ private static final Log LOG = LogFactory.getLog(TestDirectoryScanner.class);
+ private static final Configuration CONF = new Configuration();
+ private static final int DEFAULT_GEN_STAMP = 9999;
+
+ private MiniDFSCluster cluster;
+ private FSDataset fds = null;
+ private DirectoryScanner scanner = null;
+ private Random rand = new Random();
+ private Random r = new Random();
+
+ static {
+ CONF.setLong("dfs.block.size", 100);
+ CONF.setInt("io.bytes.per.checksum", 1);
+ CONF.setLong("dfs.heartbeat.interval", 1L);
+ }
+
+ /** create a file with a length of <code>fileLen</code> */
+ private void createFile(String fileName, long fileLen) throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ Path filePath = new Path(fileName);
+ DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
+ }
+
+ /** Truncate a block file */
+ private long truncateBlockFile() throws IOException {
+ synchronized (fds) {
+ for (Entry<Block, DatanodeBlockInfo> entry : fds.volumeMap.entrySet()) {
+ Block b = entry.getKey();
+ File f = entry.getValue().getFile();
+ File mf = FSDataset.getMetaFile(f, b);
+ // Truncate a block file that has a corresponding metadata file
+ if (f.exists() && f.length() != 0 && mf.exists()) {
+ FileOutputStream s = new FileOutputStream(f);
+ FileChannel channel = s.getChannel();
+ channel.truncate(0);
+ LOG.info("Truncated block file " + f.getAbsolutePath());
+ return entry.getKey().getBlockId();
+ }
+ }
+ }
+ return 0;
+ }
+
+ /** Delete a block file */
+ private long deleteBlockFile() {
+ synchronized(fds) {
+ for (Entry<Block, DatanodeBlockInfo> entry : fds.volumeMap.entrySet()) {
+ Block b = entry.getKey();
+ File f = entry.getValue().getFile();
+ File mf = FSDataset.getMetaFile(f, b);
+ // Delete a block file that has corresponding metadata file
+ if (f.exists() && mf.exists() && f.delete()) {
+ LOG.info("Deleting block file " + f.getAbsolutePath());
+ return entry.getKey().getBlockId();
+ }
+ }
+ }
+ return 0;
+ }
+
+ /** Delete block meta file */
+ private long deleteMetaFile() {
+ synchronized(fds) {
+ for (Entry<Block, DatanodeBlockInfo> entry : fds.volumeMap.entrySet()) {
+ Block b = entry.getKey();
+ String blkfile = entry.getValue().getFile().getAbsolutePath();
+ long genStamp = b.getGenerationStamp();
+ String metafile = FSDataset.getMetaFileName(blkfile, genStamp);
+ File file = new File(metafile);
+ // Delete a metadata file
+ if (file.exists() && file.delete()) {
+ LOG.info("Deleting metadata file " + file.getAbsolutePath());
+ return entry.getKey().getBlockId();
+ }
+ }
+ }
+ return 0;
+ }
+
+ /** Get a random blockId that is not used already */
+ private long getFreeBlockId() {
+ long id = rand.nextLong();
+ while (true) {
+ id = rand.nextLong();
+ Block b = new Block(id);
+ DatanodeBlockInfo info = null;
+ synchronized(fds) {
+ info = fds.volumeMap.get(b);
+ }
+ if (info == null) {
+ break;
+ }
+ }
+ return id;
+ }
+
+ private String getBlockFile(long id) {
+ return Block.BLOCK_FILE_PREFIX + id;
+ }
+
+ private String getMetaFile(long id) {
+ return Block.BLOCK_FILE_PREFIX + id + "_" + DEFAULT_GEN_STAMP
+ + Block.METADATA_EXTENSION;
+ }
+
+ /** Create a block file in a random volume*/
+ private long createBlockFile() throws IOException {
+ FSVolume[] volumes = fds.volumes.volumes;
+ int index = rand.nextInt(volumes.length - 1);
+ long id = getFreeBlockId();
+ File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+ if (file.createNewFile()) {
+ LOG.info("Created block file " + file.getName());
+ }
+ return id;
+ }
+
+ /** Create a metafile in a random volume*/
+ private long createMetaFile() throws IOException {
+ FSVolume[] volumes = fds.volumes.volumes;
+ int index = rand.nextInt(volumes.length - 1);
+ long id = getFreeBlockId();
+ File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+ if (file.createNewFile()) {
+ LOG.info("Created metafile " + file.getName());
+ }
+ return id;
+ }
+
+ /** Create block file and corresponding metafile in a rondom volume */
+ private long createBlockMetaFile() throws IOException {
+ FSVolume[] volumes = fds.volumes.volumes;
+ int index = rand.nextInt(volumes.length - 1);
+ long id = getFreeBlockId();
+ File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+ if (file.createNewFile()) {
+ LOG.info("Created block file " + file.getName());
+
+ // Create files with same prefix as block file but extension names
+ // such that during sorting, these files appear around meta file
+ // to test how DirectoryScanner handles extraneous files
+ String name1 = file.getAbsolutePath() + ".l";
+ String name2 = file.getAbsolutePath() + ".n";
+ file = new File(name1);
+ if (file.createNewFile()) {
+ LOG.info("Created extraneous file " + name1);
+ }
+
+ file = new File(name2);
+ if (file.createNewFile()) {
+ LOG.info("Created extraneous file " + name2);
+ }
+
+ file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+ if (file.createNewFile()) {
+ LOG.info("Created metafile " + file.getName());
+ }
+ }
+ return id;
+ }
+
+ private void scan(long totalBlocks, int diffsize, long missingMetaFile, long
missingBlockFile,
+ long missingMemoryBlocks, long mismatchBlocks) {
+ scanner.reconcile();
+ assertEquals(totalBlocks, scanner.totalBlocks);
+ assertEquals(diffsize, scanner.diff.size());
+ assertEquals(missingMetaFile, scanner.missingMetaFile);
+ assertEquals(missingBlockFile, scanner.missingBlockFile);
+ assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks);
+ assertEquals(mismatchBlocks, scanner.mismatchBlocks);
+ }
+
+ public void test() throws Exception {
+ cluster = new MiniDFSCluster(CONF, 1, true, null);
+ try {
+ cluster.waitActive();
+ fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
+ scanner = new DirectoryScanner(fds, CONF);
+
+ // Add files with 100 blocks
+ createFile("/tmp/t1", 10000);
+ long totalBlocks = 100;
+
+ // Test1: No difference between in-memory and disk
+ scan(100, 0, 0, 0, 0, 0);
+
+ // Test2: block metafile is missing
+ long blockId = deleteMetaFile();
+ scan(totalBlocks, 1, 1, 0, 0, 1);
+ verifyGenStamp(blockId, Block.GRANDFATHER_GENERATION_STAMP);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test3: block file is missing
+ blockId = deleteBlockFile();
+ scan(totalBlocks, 1, 0, 1, 0, 0);
+ totalBlocks--;
+ verifyDeletion(blockId);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test4: A block file exists for which there is no metafile and
+ // a block in memory
+ blockId = createBlockFile();
+ totalBlocks++;
+ scan(totalBlocks, 1, 1, 0, 1, 0);
+ verifyAddition(blockId, Block.GRANDFATHER_GENERATION_STAMP, 0);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test5: A metafile exists for which there is no block file and
+ // a block in memory
+ blockId = createMetaFile();
+ scan(totalBlocks+1, 1, 0, 1, 1, 0);
+ File metafile = new File(getMetaFile(blockId));
+ assertTrue(!metafile.exists());
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test6: A block file and metafile exists for which there is no block in
+ // memory
+ blockId = createBlockMetaFile();
+ totalBlocks++;
+ scan(totalBlocks, 1, 0, 0, 1, 0);
+ verifyAddition(blockId, DEFAULT_GEN_STAMP, 0);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test7: Delete bunch of metafiles
+ for (int i = 0; i < 10; i++) {
+ blockId = deleteMetaFile();
+ }
+ scan(totalBlocks, 10, 10, 0, 0, 10);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test8: Delete bunch of block files
+ for (int i = 0; i < 10; i++) {
+ blockId = deleteBlockFile();
+ }
+ scan(totalBlocks, 10, 0, 10, 0, 0);
+ totalBlocks -= 10;
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test9: create a bunch of blocks files
+ for (int i = 0; i < 10 ; i++) {
+ blockId = createBlockFile();
+ }
+ totalBlocks += 10;
+ scan(totalBlocks, 10, 10, 0, 10, 0);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test10: create a bunch of metafiles
+ for (int i = 0; i < 10 ; i++) {
+ blockId = createMetaFile();
+ }
+ scan(totalBlocks+10, 10, 0, 10, 10, 0);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test11: create a bunch block files and meta files
+ for (int i = 0; i < 10 ; i++) {
+ blockId = createBlockMetaFile();
+ }
+ totalBlocks += 10;
+ scan(totalBlocks, 10, 0, 0, 10, 0);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test12: truncate block files to test block length mismatch
+ for (int i = 0; i < 10 ; i++) {
+ truncateBlockFile();
+ }
+ scan(totalBlocks, 10, 0, 0, 0, 10);
+ scan(totalBlocks, 0, 0, 0, 0, 0);
+
+ // Test13: all the conditions combined
+ createMetaFile();
+ createBlockFile();
+ createBlockMetaFile();
+ deleteMetaFile();
+ deleteBlockFile();
+ truncateBlockFile();
+ scan(totalBlocks+3, 6, 2, 2, 3, 2);
+ scan(totalBlocks+1, 0, 0, 0, 0, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private void verifyAddition(long blockId, long genStamp, long size) {
+ Block memBlock = fds.getBlockKey(blockId);
+ assertNotNull(memBlock);
+ DatanodeBlockInfo blockInfo;
+ synchronized(fds) {
+ blockInfo = fds.volumeMap.get(memBlock);
+ }
+ assertNotNull(blockInfo);
+
+ // Added block has the same file as the one created by the test
+ File file = new File(getBlockFile(blockId));
+ assertEquals(file.getName(), blockInfo.getFile().getName());
+
+ // Generation stamp is same as that of created file
+ assertEquals(genStamp, memBlock.getGenerationStamp());
+
+ // File size matches
+ assertEquals(size, memBlock.getNumBytes());
+ }
+
+ private void verifyDeletion(long blockId) {
+ // Ensure block does not exist in memory
+ synchronized(fds) {
+ assertEquals(null, fds.volumeMap.get(new Block(blockId)));
+ }
+ }
+
+ private void verifyGenStamp(long blockId, long genStamp) {
+ Block memBlock;
+ synchronized(fds) {
+ memBlock = fds.getBlockKey(blockId);
+ }
+ assertNotNull(memBlock);
+ assertEquals(genStamp, memBlock.getGenerationStamp());
+ }
+}