Author: hairong
Date: Thu Mar 5 18:16:49 2009
New Revision: 750533
URL: http://svn.apache.org/viewvc?rev=750533&view=rev
Log:
HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks.
Contributed by Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=750533&r1=750532&r2=750533&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 5 18:16:49 2009
@@ -269,9 +269,6 @@
HADOOP-5383. Avoid building an unused string in NameNode's
verifyReplication(). (Raghu Angadi)
- HADOOP-5384. Fix a problem that DataNodeCluster creates blocks with
- generationStamp == 1. (szetszwo)
-
Release 0.20.0 - Unreleased
INCOMPATIBLE CHANGES
@@ -938,6 +935,9 @@
HADOOP-5274. Fix gridmix2 dependency on wordcount example. (cdouglas)
+ HADOOP-5145. Balancer sometimes runs out of memory after running days or
weeks.
+ (hairong)
+
Release 0.19.2 - Unreleased
BUG FIXES
@@ -951,6 +951,9 @@
HADOOP-4638. Fixes job recovery to not crash the job tracker for problems
with a single job file. (Amar Kamat via yhemanth)
+ HADOOP-5384. Fix a problem that DataNodeCluster creates blocks with
+ generationStamp == 1. (szetszwo)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=750533&r1=750532&r2=750533&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Thu Mar 5 18:16:49 2009
@@ -208,8 +208,7 @@
private Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>();
- private Map<Block, BalancerBlock> movedBlocks
- = new HashMap<Block, BalancerBlock>();
+ private MovedBlocks movedBlocks = new MovedBlocks();
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
@@ -264,7 +263,7 @@
if (isGoodBlockCandidate(source, target, block)) {
this.block = block;
if ( chooseProxySource() ) {
- addToMoved(block);
+ movedBlocks.add(block);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move block "+ block.getBlockId()
+" with a length of
"+StringUtils.byteDesc(block.getNumBytes())
@@ -697,7 +696,7 @@
private void filterMovedBlocks() {
for (Iterator<BalancerBlock> blocks=getBlockIterator();
blocks.hasNext();) {
- if (isMoved(blocks.next())) {
+ if (movedBlocks.contains(blocks.next())) {
blocks.remove();
}
}
@@ -1244,20 +1243,63 @@
} while (shouldWait);
}
- /* mark a block to be moved */
- private void addToMoved(BalancerBlock block) {
- synchronized(movedBlocks) {
- movedBlocks.put(block.getBlock(), block);
- }
- }
-
- /* check if a block is marked as moved */
- private boolean isMoved(BalancerBlock block) {
- synchronized(movedBlocks) {
- return movedBlocks.containsKey(block.getBlock());
+ /** This window makes sure to keep blocks that have been moved within 1.5
hour.
+ * Old window has blocks that are older;
+ * Current window has blocks that are more recent;
+ * Cleanup method triggers the check if blocks in the old window are
+ * more than 1.5 hour old. If yes, purge the old window and then
+ * move blocks in current window to old window.
+ */
+ private static class MovedBlocks {
+ private long lastCleanupTime = System.currentTimeMillis();
+ private static long winWidth = 5400*1000L; // 1.5 hour
+ final private static int CUR_WIN = 0;
+ final private static int OLD_WIN = 1;
+ final private static int NUM_WINS = 2;
+ final private List<HashMap<Block, BalancerBlock>> movedBlocks =
+ new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
+
+ /* initialize the moved blocks collection */
+ private MovedBlocks() {
+ movedBlocks.add(new HashMap<Block,BalancerBlock>());
+ movedBlocks.add(new HashMap<Block,BalancerBlock>());
+ }
+
+ /* set the win width */
+ private void setWinWidth(Configuration conf) {
+ winWidth = conf.getLong(
+ "dfs.balancer.movedWinWidth", 5400*1000L);
+ }
+
+ /* add a block thus marking a block to be moved */
+ synchronized private void add(BalancerBlock block) {
+ movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
+ }
+
+ /* check if a block is marked as moved */
+ synchronized private boolean contains(BalancerBlock block) {
+ return contains(block.getBlock());
+ }
+
+ /* check if a block is marked as moved */
+ synchronized private boolean contains(Block block) {
+ return movedBlocks.get(CUR_WIN).containsKey(block) ||
+ movedBlocks.get(OLD_WIN).containsKey(block);
+ }
+
+ /* remove old blocks */
+ synchronized private void cleanup() {
+ long curTime = System.currentTimeMillis();
+ // check if old win is older than winWidth
+ if (lastCleanupTime + winWidth <= curTime) {
+ // purge the old window
+ movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
+ movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
+ lastCleanupTime = curTime;
+ }
}
}
-
+
/* Decide if it is OK to move the given block from source to target
* A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved;
@@ -1267,7 +1309,7 @@
private boolean isGoodBlockCandidate(Source source,
BalancerDatanode target, BalancerBlock block) {
// check if the block is moved or not
- if (isMoved(block)) {
+ if (movedBlocks.contains(block)) {
return false;
}
if (block.isLocatedOnDatanode(target)) {
@@ -1317,6 +1359,7 @@
this.targets.clear();
this.avgUtilization = 0.0D;
cleanGlobalBlockList();
+ this.movedBlocks.cleanup();
}
/* Remove all blocks from the global block list except for the ones in the
@@ -1326,7 +1369,7 @@
for (Iterator<Block>
globalBlockListIterator=globalBlockList.keySet().iterator();
globalBlockListIterator.hasNext();) {
Block block = globalBlockListIterator.next();
- if(!movedBlocks.containsKey(block)) {
+ if(!movedBlocks.contains(block)) {
globalBlockListIterator.remove();
}
}
@@ -1461,9 +1504,11 @@
// close the output file
IOUtils.closeStream(out);
- try {
- fs.delete(BALANCER_ID_PATH, true);
- } catch(IOException ignored) {
+ if (fs != null) {
+ try {
+ fs.delete(BALANCER_ID_PATH, true);
+ } catch(IOException ignored) {
+ }
}
System.out.println("Balancing took " +
time2Str(Util.now()-startTime));
@@ -1528,6 +1573,7 @@
/** set this balancer's configuration */
public void setConf(Configuration conf) {
this.conf = conf;
+ movedBlocks.setWinWidth(conf);
}
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=750533&r1=750532&r2=750533&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Thu Mar 5 18:16:49 2009
@@ -60,6 +60,7 @@
CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
CONF.setLong("dfs.heartbeat.interval", 1L);
CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ CONF.setLong("dfs.balancer.movedWinWidth", 2000L);
Balancer.setBlockMoveWaitTime(1000L) ;
}