Author: szetszwo Date: Wed Oct 16 23:06:00 2013 New Revision: 1532932 URL: http://svn.apache.org/r1532932 Log: HDFS-4376. Fix race conditions in Balancer. Contributed by Junping Du
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1532932&r1=1532931&r2=1532932&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Oct 16 23:06:00 2013 @@ -355,6 +355,8 @@ Release 2.3.0 - UNRELEASED HDFS-5283. Under construction blocks only inside snapshots should not be counted in safemode threshhold. (Vinay via szetszwo) + HDFS-4376. Fix race conditions in Balancer. (Junping Du via szetszwo) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1532932&r1=1532931&r2=1532932&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Oct 16 23:06:00 2013 @@ -506,7 +506,7 @@ public class Balancer { final DatanodeInfo datanode; final double utilization; final long maxSize2Move; - protected long scheduledSize = 0L; + private long scheduledSize = 0L; // blocks being moved but not confirmed yet private List<PendingBlockMove> pendingBlocks = new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); @@ -555,20 +555,35 @@ public class Balancer { } /** Decide if still need to move more bytes */ - protected boolean hasSpaceForScheduling() { + protected synchronized boolean hasSpaceForScheduling() { return scheduledSize<maxSize2Move; } /** Return the total number of bytes that need to be moved */ - protected long availableSizeToMove() { + protected synchronized long availableSizeToMove() { return maxSize2Move-scheduledSize; } - /* increment scheduled size */ - protected void incScheduledSize(long size) { + /** increment scheduled size */ + protected synchronized void incScheduledSize(long size) { scheduledSize += size; } + /** decrement scheduled size */ + protected synchronized void decScheduledSize(long size) { + scheduledSize -= size; + } + + /** get scheduled size */ + protected synchronized long getScheduledSize(){ + return scheduledSize; + } + + /** get scheduled size */ + protected synchronized void setScheduledSize(long size){ + scheduledSize = size; + } + /* Check if the node can schedule more blocks to move */ synchronized private boolean isPendingQNotFull() { if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) { @@ -702,8 +717,8 @@ public class Balancer { pendingBlock.source = this; pendingBlock.target = target; if ( pendingBlock.chooseBlockAndProxy() ) { - long blockSize = pendingBlock.block.getNumBytes(); - scheduledSize -= blockSize; + long blockSize = pendingBlock.block.getNumBytes(); + decScheduledSize(blockSize); task.size -= blockSize; if (task.size == 0) { tasks.remove(); @@ -747,10 +762,11 @@ public class Balancer { private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins private void dispatchBlocks() { long startTime = Time.now(); + long scheduledSize = getScheduledSize(); this.blocksToReceive = 2*scheduledSize; boolean isTimeUp = false; int noPendingBlockIteration = 0; - while(!isTimeUp && scheduledSize>0 && + while(!isTimeUp && getScheduledSize()>0 && (!srcBlockList.isEmpty() || blocksToReceive>0)) { PendingBlockMove pendingBlock = chooseNextBlockToMove(); if (pendingBlock != null) { @@ -779,7 +795,7 @@ public class Balancer { // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - scheduledSize = 0; + setScheduledSize(0); } } @@ -992,7 +1008,7 @@ public class Balancer { long bytesToMove = 0L; for (Source src : sources) { - bytesToMove += src.scheduledSize; + bytesToMove += src.getScheduledSize(); } return bytesToMove; } @@ -1093,7 +1109,7 @@ public class Balancer { bytesMoved += bytes; } - private long get() { + private synchronized long get() { return bytesMoved; } };