Author: szetszwo
Date: Thu Mar 15 18:24:40 2012
New Revision: 1301127
URL: http://svn.apache.org/viewvc?rev=1301127&view=rev
Log:
HDFS-3005. FSVolume.decDfsUsed(..) should be synchronized.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1301127&r1=1301126&r2=1301127&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Mar 15
18:24:40 2012
@@ -298,6 +298,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3093. Fix bug where namenode -format interpreted the -force flag in
reverse. (todd)
+ HDFS-3005. FSVolume.decDfsUsed(..) should be synchronized. (szetszwo)
+
BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1301127&r1=1301126&r2=1301127&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Thu Mar 15 18:24:40 2012
@@ -95,15 +95,18 @@ class FSDataset implements FSDatasetInte
* A node type that can be built into a tree reflecting the
* hierarchy of blocks on the local disk.
*/
- private class FSDir {
+ private static class FSDir {
+ final int maxBlocksPerDir;
final File dir;
int numBlocks = 0;
FSDir children[];
int lastChildIdx = 0;
- private FSDir(File dir)
+ private FSDir(File dir, int maxBlocksPerDir)
throws IOException {
this.dir = dir;
+ this.maxBlocksPerDir = maxBlocksPerDir;
+
this.children = null;
if (!dir.exists()) {
if (!dir.mkdirs()) {
@@ -115,7 +118,7 @@ class FSDataset implements FSDatasetInte
List<FSDir> dirList = new ArrayList<FSDir>();
for (int idx = 0; idx < files.length; idx++) {
if (files[idx].isDirectory()) {
- dirList.add(new FSDir(files[idx]));
+ dirList.add(new FSDir(files[idx], maxBlocksPerDir));
} else if (Block.isBlockFilename(files[idx])) {
numBlocks++;
}
@@ -165,7 +168,8 @@ class FSDataset implements FSDatasetInte
if (children == null || children.length == 0) {
children = new FSDir[maxBlocksPerDir];
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
- children[idx] = new FSDir(new File(dir,
DataStorage.BLOCK_SUBDIR_PREFIX+idx));
+ final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
+ children[idx] = new FSDir(sub, maxBlocksPerDir);
}
}
@@ -297,8 +301,10 @@ class FSDataset implements FSDatasetInte
* A BlockPoolSlice represents a portion of a BlockPool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool.
+ *
+ * This class is synchronized by {@link FSVolume}.
*/
- private class BlockPoolSlice {
+ private static class BlockPoolSlice {
private final String bpid;
private final FSVolume volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
@@ -335,10 +341,16 @@ class FSDataset implements FSDatasetInte
FileUtil.fullyDelete(tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+ final boolean supportAppends = conf.getBoolean(
+ DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
+ DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
if (rbwDir.exists() && !supportAppends) {
FileUtil.fullyDelete(rbwDir);
}
- this.finalizedDir = new FSDir(finalizedDir);
+ final int maxBlocksPerDir = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
+ DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
+ this.finalizedDir = new FSDir(finalizedDir, maxBlocksPerDir);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
if (!rbwDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " +
rbwDir.toString());
@@ -365,12 +377,12 @@ class FSDataset implements FSDatasetInte
return rbwDir;
}
+ /**
+ * This should be used only by {@link FSVolume#decDfsUsed(String, long)}
+ * and it will be synchronized there.
+ */
void decDfsUsed(long value) {
- // The caller to this method (BlockFileDeleteTask.run()) does
- // not have locked FSDataset.this yet.
- synchronized(FSDataset.this) {
- dfsUsage.decDfsUsed(value);
- }
+ dfsUsage.decDfsUsed(value);
}
long getDfsUsed() throws IOException {
@@ -530,14 +542,22 @@ class FSDataset implements FSDatasetInte
dfsUsage.shutdown();
}
}
-
- class FSVolume implements FSVolumeInterface {
+
+ /**
+ * The underlying volume used to store replica.
+ *
+ * It uses the {@link FSDataset} object for synchronization.
+ */
+ static class FSVolume implements FSVolumeInterface {
+ private final FSDataset dataset;
private final Map<String, BlockPoolSlice> map = new HashMap<String,
BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
- FSVolume(File currentDir, Configuration conf) throws IOException {
+ FSVolume(FSDataset dataset, File currentDir, Configuration conf
+ ) throws IOException {
+ this.dataset = dataset;
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.currentDir = currentDir;
@@ -555,9 +575,7 @@ class FSDataset implements FSDatasetInte
}
void decDfsUsed(String bpid, long value) {
- // The caller to this method (BlockFileDeleteTask.run()) does
- // not have locked FSDataset.this yet.
- synchronized(FSDataset.this) {
+ synchronized(dataset) {
BlockPoolSlice bp = map.get(bpid);
if (bp != null) {
bp.decDfsUsed(value);
@@ -566,11 +584,11 @@ class FSDataset implements FSDatasetInte
}
long getDfsUsed() throws IOException {
- // TODO valid synchronization
long dfsUsed = 0;
- Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
- for (Entry<String, BlockPoolSlice> entry : set) {
- dfsUsed += entry.getValue().getDfsUsed();
+ synchronized(dataset) {
+ for(BlockPoolSlice s : map.values()) {
+ dfsUsed += s.getDfsUsed();
+ }
}
return dfsUsed;
}
@@ -630,11 +648,11 @@ class FSDataset implements FSDatasetInte
*/
@Override
public String[] getBlockPoolList() {
- synchronized(FSDataset.this) {
+ synchronized(dataset) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
-
+
/**
* Temporary files. They get moved to the finalized block directory when
* the block is finalized.
@@ -658,14 +676,17 @@ class FSDataset implements FSDatasetInte
return bp.addBlock(b, f);
}
+ /**
+ * This should be used only by {@link FSVolumeSet#checkDirs()}
+ * and it will be synchronized there.
+ */
void checkDirs() throws DiskErrorException {
// TODO:FEDERATION valid synchronization
- Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
- for (Entry<String, BlockPoolSlice> entry : set) {
- entry.getValue().checkDirs();
+ for(BlockPoolSlice s : map.values()) {
+ s.checkDirs();
}
}
-
+
void getVolumeMap(ReplicasMap volumeMap) throws IOException {
Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
@@ -877,31 +898,25 @@ class FSDataset implements FSDatasetInte
// Make a copy of volumes for performing modification
final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
- for (int idx = 0; idx < volumeList.size(); idx++) {
- FSVolume fsv = volumeList.get(idx);
+ for(Iterator<FSVolume> i = volumeList.iterator(); i.hasNext(); ) {
+ final FSVolume fsv = i.next();
try {
fsv.checkDirs();
} catch (DiskErrorException e) {
DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
if (removedVols == null) {
- removedVols = new ArrayList<FSVolume>(1);
+ removedVols = new ArrayList<FSVolume>(2);
}
removedVols.add(fsv);
fsv.shutdown();
- volumeList.set(idx, null); // Remove the volume
+ i.remove(); // Remove the volume
numFailedVolumes++;
}
}
- // Remove null volumes from the volumes array
if (removedVols != null && removedVols.size() > 0) {
- final List<FSVolume> newVols = new ArrayList<FSVolume>();
- for (FSVolume vol : volumeList) {
- if (vol != null) {
- newVols.add(vol);
- }
- }
- volumes = Collections.unmodifiableList(newVols); // Replace volume list
+ // Replace volume list
+ volumes = Collections.unmodifiableList(volumeList);
DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
+ removedVols.size() + " volumes. List of current volumes: "
+ this);
@@ -1048,7 +1063,6 @@ class FSDataset implements FSDatasetInte
private final DataNode datanode;
final FSVolumeSet volumes;
- private final int maxBlocksPerDir;
final ReplicasMap volumeMap;
final FSDatasetAsyncDiskService asyncDiskService;
private final int validVolsRequired;
@@ -1056,20 +1070,12 @@ class FSDataset implements FSDatasetInte
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();
- final boolean supportAppends;
-
/**
* An FSDataset has a directory where it loads its data files.
*/
private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.datanode = datanode;
- this.maxBlocksPerDir =
- conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
- this.supportAppends =
- conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
- DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
@@ -1098,7 +1104,7 @@ class FSDataset implements FSDatasetInte
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
- volArray.add(new FSVolume(dir, conf));
+ volArray.add(new FSVolume(this, dir, conf));
DataNode.LOG.info("FSDataset added volume - " + dir);
}
volumeMap = new ReplicasMap(this);