Author: hairong
Date: Tue Oct 28 12:13:39 2008
New Revision: 708637
URL: http://svn.apache.org/viewvc?rev=708637&view=rev
Log:
Revert the patch to HADOOP-4116 on the branch 0.18 section because it caused
incompatibility between 0.18.1 and 0.18.2
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Oct 28 12:13:39 2008
@@ -4,8 +4,6 @@
BUG FIXES
- HADOOP-4116. Balancer should provide better resource management. (hairong)
-
HADOOP-3614. Fix a bug that Datanode may use an old GenerationStamp to get
meta file. (szetszwo)
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
Tue Oct 28 12:13:39 2008
@@ -28,6 +28,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -173,11 +174,6 @@
LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
- /** The maximum number of concurrent blocks moves for
- * balancing purpose at a datanode
- */
- public static final int MAX_NUM_CONCURRENT_MOVES = 5;
-
private Configuration conf;
private double threshold = 10D;
@@ -212,10 +208,10 @@
private double avgUtilization = 0.0D;
- final static private int MOVER_THREAD_POOL_SIZE = 1000;
+ final private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
- final static private int DISPATCHER_THREAD_POOL_SIZE = 200;
+ final private int DISPATCHER_THREAD_POOL_SIZE = 200;
final private ExecutorService dispatcherExecutor =
Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
@@ -260,13 +256,11 @@
this.block = block;
if ( chooseProxySource() ) {
addToMoved(block);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decided to move block "+ block.getBlockId()
- +" with a length of "+FsShell.byteDesc(block.getNumBytes())
- + " bytes from " + source.getName()
- + " to " + target.getName()
- + " using proxy source " + proxySource.getName() );
- }
+ LOG.info("Decided to move block "+ block.getBlockId()
+ +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+ + " bytes from " + source.getName()
+ + " to " + target.getName()
+ + " using proxy source " + proxySource.getName() );
return true;
}
}
@@ -307,8 +301,10 @@
DataInputStream in = null;
try {
sock.connect(DataNode.createSocketAddr(
- target.datanode.getName()), FSConstants.READ_TIMEOUT);
- sock.setKeepAlive(true);
+ proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
+ long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec",
1024L*1024);
+ sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+
+ (int)(block.getNumBytes()*1500/bandwidth));
out = new DataOutputStream( new BufferedOutputStream(
sock.getOutputStream(), FSConstants.BUFFER_SIZE));
sendRequest(out);
@@ -316,17 +312,25 @@
sock.getInputStream(), FSConstants.BUFFER_SIZE));
receiveResponse(in);
bytesMoved.inc(block.getNumBytes());
- LOG.info( "Moving block " + block.getBlock().getBlockId() +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( "Moving block " + block.getBlock().getBlockId() +
" from "+ source.getName() + " to " +
target.getName() + " through " +
proxySource.getName() +
- " is succeeded." );
+ " succeeded." );
+ }
+ } catch (SocketTimeoutException te) {
+ LOG.warn("Timeout moving block "+block.getBlockId()+
+ " from " + source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName());
} catch (IOException e) {
LOG.warn("Error moving block "+block.getBlockId()+
" from " + source.getName() + " to " +
target.getName() + " through " +
proxySource.getName() +
- ": "+e.getMessage());
+ ": "+e.getMessage()+ "\n" +
+ StringUtils.stringifyException(e) );
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
@@ -349,11 +353,11 @@
/* Send a block copy request to the outputstream*/
private void sendRequest(DataOutputStream out) throws IOException {
out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
- out.writeByte(FSConstants.OP_REPLACE_BLOCK);
+ out.writeByte(FSConstants.OP_COPY_BLOCK);
out.writeLong(block.getBlock().getBlockId());
out.writeLong(block.getBlock().getGenerationStamp());
Text.writeString(out, source.getStorageID());
- proxySource.write(out);
+ target.write(out);
out.flush();
}
@@ -361,7 +365,11 @@
private void receiveResponse(DataInputStream in) throws IOException {
short status = in.readShort();
if (status != FSConstants.OP_STATUS_SUCCESS) {
- throw new IOException("block move is failed.");
+ throw new IOException("Moving block "+block.getBlockId()+
+ " from "+source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ "failed");
}
}
@@ -377,10 +385,8 @@
private void scheduleBlockMove() {
moverExecutor.execute(new Runnable() {
public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting moving "+ block.getBlockId() +
- " from " + proxySource.getName() + " to " + target.getName());
- }
+ LOG.info("Starting moving "+ block.getBlockId() +
+ " from " + proxySource.getName() + " to " + target.getName());
dispatch();
}
});
@@ -470,6 +476,8 @@
/* A class that keeps track of a datanode in Balancer */
private static class BalancerDatanode implements Writable {
final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+ final protected static short MAX_NUM_CONCURRENT_MOVES =
+ DataNode.MAX_BALANCING_THREADS;
protected DatanodeInfo datanode;
private double utilization;
protected long maxSizeToMove;
@@ -906,9 +914,6 @@
// compute average utilization
long totalCapacity=0L, totalUsedSpace=0L;
for (DatanodeInfo datanode : datanodes) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
- }
totalCapacity += datanode.getCapacity();
totalUsedSpace += datanode.getDfsUsed();
}
@@ -922,9 +927,6 @@
long overLoadedBytes = 0L, underLoadedBytes = 0L;
shuffleArray(datanodes);
for (DatanodeInfo datanode : datanodes) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
- }
cluster.add(datanode);
BalancerDatanode datanodeS;
if (getUtilization(datanode) > avgUtilization) {
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
Tue Oct 28 12:13:39 2008
@@ -32,7 +32,6 @@
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.dfs.IncorrectVersionException;
import org.apache.hadoop.mapred.StatusHttpServer;
-import org.apache.hadoop.dfs.Balancer;
import org.apache.hadoop.dfs.BlockCommand;
import org.apache.hadoop.dfs.DatanodeProtocol;
import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
@@ -46,6 +45,7 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
+import java.util.concurrent.Semaphore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
@@ -146,45 +146,6 @@
private static final int MAX_XCEIVER_COUNT = 256;
private int maxXceiverCount = MAX_XCEIVER_COUNT;
- /** A manager to make sure that cluster balancing does not
- * take too much resources.
- *
- * It limits the number of block moves for balancing and
- * the total amount of bandwidth they can use.
- */
- private static class BlockBalanceThrottler extends Throttler {
- private int numThreads;
-
- /**Constructor
- *
- * @param bandwidth Total amount of bandwidth can be used for balancing
- */
- private BlockBalanceThrottler(long bandwidth) {
- super(bandwidth);
- LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
- }
-
- /** Check if the block move can start.
- *
- * Return true if the thread quota is not exceeded and
- * the counter is incremented; False otherwise.
- */
- private synchronized boolean acquire() {
- if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
- return false;
- }
- numThreads++;
- return true;
- }
-
- /** Mark that the move is completed. The thread counter is decremented. */
- private synchronized void release() {
- numThreads--;
- }
- }
-
- private BlockBalanceThrottler balanceThrottler;
-
/**
* We need an estimate for block size to check if the disk partition has
* enough space. For now we set it to be the default block size set
@@ -195,6 +156,12 @@
*/
private long estimateBlockSize;
+ // The following three fields are to support balancing
+ final static short MAX_BALANCING_THREADS = 5;
+ private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
+ long balanceBandwidth;
+ private Throttler balancingThrottler;
+
// For InterDataNodeProtocol
Server ipcServer;
@@ -341,8 +308,9 @@
DataNode.nameNodeAddr = nameNodeAddr;
//set up parameter for cluster balancing
- this.balanceThrottler = new BlockBalanceThrottler(
- conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
+ this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec",
1024L*1024);
+ LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
+ this.balancingThrottler = new Throttler(balanceBandwidth);
//initialize periodic block scanner
String reason = null;
@@ -916,6 +884,24 @@
}
}
+ /* utility function for receiving a response */
+ private static void receiveResponse(Socket s, int numTargets) throws
IOException {
+ // check the response
+ DataInputStream reply = new DataInputStream(new BufferedInputStream(
+ NetUtils.getInputStream(s), BUFFER_SIZE));
+ try {
+ for (int i = 0; i < numTargets; i++) {
+ short opStatus = reply.readShort();
+ if(opStatus != OP_STATUS_SUCCESS) {
+ throw new IOException("operation failed at "+
+ s.getInetAddress());
+ }
+ }
+ } finally {
+ IOUtils.closeStream(reply);
+ }
+ }
+
/* utility function for sending a respose */
private static void sendResponse(Socket s, short opStatus, long timeout)
throws IOException {
@@ -959,7 +945,6 @@
this.ss = ss;
}
-
/**
*/
public void run() {
@@ -1375,50 +1360,67 @@
// Read in the header
long blockId = in.readLong(); // read block id
Block block = new Block(blockId, 0, in.readLong());
+ String source = Text.readString(in); // read del hint
+ DatanodeInfo target = new DatanodeInfo(); // read target
+ target.readFields(in);
- if (!balanceThrottler.acquire()) { // not able to start
- LOG.info("Not able to copy block " + blockId + " to "
- + s.getRemoteSocketAddress() + " because threads quota is
exceeded.");
- return;
- }
-
+ Socket targetSock = null;
+ short opStatus = OP_STATUS_SUCCESS;
BlockSender blockSender = null;
- DataOutputStream reply = null;
- boolean isOpSuccess = true;
-
+ DataOutputStream targetOut = null;
try {
+ balancingSem.acquireUninterruptibly();
+
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false);
- // set up response stream
- OutputStream baseStream = NetUtils.getOutputStream(
- s, socketWriteTimeout);
- reply = new DataOutputStream(new BufferedOutputStream(
- baseStream, SMALL_BUFFER_SIZE));
-
+ // get the output stream to the target
+ InetSocketAddress targetAddr =
NetUtils.createSocketAddr(target.getName());
+ targetSock = newSocket();
+ targetSock.connect(targetAddr, socketTimeout);
+ targetSock.setSoTimeout(socketTimeout);
+
+ OutputStream baseStream = NetUtils.getOutputStream(targetSock,
+
socketWriteTimeout);
+ targetOut = new DataOutputStream(
+ new BufferedOutputStream(baseStream,
SMALL_BUFFER_SIZE));
- // send block content to the target
- long read = blockSender.sendBlock(reply, baseStream,
- balanceThrottler);
+ /* send request to the target */
+ // fist write header info
+ targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
+ targetOut.writeByte(OP_REPLACE_BLOCK); // op code
+ targetOut.writeLong(block.getBlockId()); // block id
+ targetOut.writeLong(block.getGenerationStamp()); // block id
+ Text.writeString( targetOut, source); // del hint
+
+ // then send data
+ long read = blockSender.sendBlock(targetOut, baseStream,
+ balancingThrottler);
myMetrics.bytesRead.inc((int) read);
myMetrics.blocksRead.inc();
- LOG.info("Copied block " + block + " to " +
s.getRemoteSocketAddress());
+ // check the response from target
+ receiveResponse(targetSock, 1);
+
+ LOG.info("Copied block " + block + " to " + targetAddr);
} catch (IOException ioe) {
- isOpSuccess = false;
+ opStatus = OP_STATUS_ERROR;
+ LOG.warn("Got exception while serving " + block + " to "
+ + target.getName() + ": " + StringUtils.stringifyException(ioe));
throw ioe;
} finally {
- balanceThrottler.release();
- if (isOpSuccess) {
- try {
- // send one last byte to indicate that the resource is cleaned.
- reply.writeChar('d');
- } catch (IOException ignored) {
- }
+ /* send response to the requester */
+ try {
+ sendResponse(s, opStatus, socketWriteTimeout);
+ } catch (IOException replyE) {
+ LOG.warn("Error writing the response back to "+
+ s.getRemoteSocketAddress() + "\n" +
+ StringUtils.stringifyException(replyE) );
}
- IOUtils.closeStream(reply);
+ IOUtils.closeStream(targetOut);
IOUtils.closeStream(blockSender);
+ balancingSem.release();
}
}
@@ -1431,59 +1433,21 @@
* @throws IOException
*/
private void replaceBlock(DataInputStream in) throws IOException {
- /* read header */
- long blockId = in.readLong();
- Block block = new Block(blockId, estimateBlockSize,
- in.readLong()); // block id & generation stamp
- String sourceID = Text.readString(in); // read del hint
- DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
- proxySource.readFields(in);
-
- if (!balanceThrottler.acquire()) { // not able to start
- LOG.warn("Not able to receive block " + blockId + " from "
- + s.getRemoteSocketAddress() + " because threads quota is
exceeded.");
- sendResponse(s, (short)OP_STATUS_ERROR,
- socketWriteTimeout);
- return;
- }
-
- Socket proxySock = null;
- DataOutputStream proxyOut = null;
-
- short opStatus = OP_STATUS_SUCCESS;
- BlockReceiver blockReceiver = null;
- DataInputStream proxyReply = null;
-
- try {
- // get the output stream to the proxy
- InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
- proxySource.getName());
- proxySock = newSocket();
- proxySock.connect(proxyAddr, socketTimeout);
- proxySock.setSoTimeout(socketTimeout);
-
- OutputStream baseStream = NetUtils.getOutputStream(proxySock,
- socketWriteTimeout);
- proxyOut = new DataOutputStream(
- new BufferedOutputStream(baseStream,
SMALL_BUFFER_SIZE));
-
- /* send request to the proxy */
- proxyOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
- proxyOut.writeByte(OP_COPY_BLOCK); // op code
- proxyOut.writeLong(block.getBlockId()); // block id
- proxyOut.writeLong(block.getGenerationStamp()); // block id
- proxyOut.flush();
-
- // receive the response from the proxy
- proxyReply = new DataInputStream(new BufferedInputStream(
- NetUtils.getInputStream(proxySock), BUFFER_SIZE));
+ balancingSem.acquireUninterruptibly();
+
+ /* read header */
+ Block block = new Block(in.readLong(), estimateBlockSize,
in.readLong()); // block id & len
+ String sourceID = Text.readString(in);
+
+ short opStatus = OP_STATUS_SUCCESS;
+ BlockReceiver blockReceiver = null;
+ try {
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(
- block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
- false, "", null);
+ blockReceiver = new BlockReceiver(
+ block, in, s.getRemoteSocketAddress().toString(), false, "", null);
// receive a block
- blockReceiver.receiveBlock(null, null, null, null, balanceThrottler,
-1);
+ blockReceiver.receiveBlock(null, null, null, null, balancingThrottler,
-1);
// notify name node
notifyNamenodeReceivedBlock(block, sourceID);
@@ -1494,26 +1458,14 @@
opStatus = OP_STATUS_ERROR;
throw ioe;
} finally {
- // receive the last byte that indicates the proxy released its thread
resource
- if (opStatus == OP_STATUS_SUCCESS) {
- try {
- proxyReply.readChar();
- } catch (IOException ignored) {
- }
- }
-
- // now release the thread resource
- balanceThrottler.release();
-
// send response back
try {
sendResponse(s, opStatus, socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " +
s.getRemoteSocketAddress());
}
- IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
- IOUtils.closeStream(proxyReply);
+ balancingSem.release();
}
}
}
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java
Tue Oct 28 12:13:39 2008
@@ -100,15 +100,12 @@
* This should change when serialization of DatanodeInfo, not just
* when protocol changes. It is not very obvious.
*/
- /* Version 14:
- * OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
- * including the block id, source, and proxy.
- * OP_COPY_BLOCK is sent from the destination to the proxy, which contains
- * only the block id.
- * A reply to OP_COPY_BLOCK sends the block content.
- * A reply to OP_REPLACE_BLOCK includes an operation status.
+ /*
+ * Version 11:
+ * OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal
+ * DatanodeInfo of client requesting transfer is also sent.
*/
- public static final int DATA_TRANSFER_VERSION = 14;
+ public static final int DATA_TRANSFER_VERSION = 11;
// Return codes for file create
public static final int OPERATION_FAILED = 0;
Modified:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
(original)
+++
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
Tue Oct 28 12:13:39 2008
@@ -203,7 +203,7 @@
}
/* Copy a block from sourceProxy to destination. If the block becomes
- * over-replicated, preferably remove it from source.
+ * overreplicated, preferrably remove it from source.
*
* Return true if a block is successfully copied; otherwise false.
*/
@@ -211,16 +211,16 @@
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
Socket sock = new Socket();
sock.connect(NetUtils.createSocketAddr(
- destination.getName()), FSConstants.READ_TIMEOUT);
- sock.setKeepAlive(true);
+ sourceProxy.getName()), FSConstants.READ_TIMEOUT);
+ sock.setSoTimeout(FSConstants.READ_TIMEOUT);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
- out.writeByte(FSConstants.OP_REPLACE_BLOCK);
+ out.writeByte(FSConstants.OP_COPY_BLOCK);
out.writeLong(block.getBlockId());
out.writeLong(block.getGenerationStamp());
Text.writeString(out, source.getStorageID());
- sourceProxy.write(out);
+ destination.write(out);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());