Author: hairong
Date: Thu Sep 25 12:06:01 2008
New Revision: 699056
URL: http://svn.apache.org/viewvc?rev=699056&view=rev
Log:
HADOOP-4116. Balancer should provide better resource management. Contributed by
Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Sep 25 12:06:01 2008
@@ -20,6 +20,7 @@
OPTIMIZATIONS
BUG FIXES
+ HADOOP-4116. Balancer should provide better resource management. (hairong)
Release 0.19.0 - Unreleased
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Thu Sep 25 12:06:01 2008
@@ -31,11 +31,15 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 13:
- * Added a new operation, OP_BLOCK_CHECKSUM, for obtaining
- * the checksum of a block from a datanode.
+ * Version 14:
+ * OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
+ * including the block id, source, and proxy.
+ * OP_COPY_BLOCK is sent from the destination to the proxy, which contains
+ * only the block id.
+ * A reply to OP_COPY_BLOCK sends the block content.
+ * A reply to OP_REPLACE_BLOCK includes an operation status.
*/
- public static final int DATA_TRANSFER_VERSION = 13;
+ public static final int DATA_TRANSFER_VERSION = 14;
// Processed at datanode stream-handler
public static final byte OP_WRITE_BLOCK = (byte) 80;
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Thu Sep 25 12:06:01 2008
@@ -28,7 +28,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -180,6 +179,11 @@
LogFactory.getLog(Balancer.class.getName());
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+ /** The maximum number of concurrent blocks moves for
+ * balancing purpose at a datanode
+ */
+ public static final int MAX_NUM_CONCURRENT_MOVES = 5;
+
private Configuration conf;
private double threshold = 10D;
@@ -214,10 +218,10 @@
private double avgUtilization = 0.0D;
- final private int MOVER_THREAD_POOL_SIZE = 1000;
+ final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
- final private int DISPATCHER_THREAD_POOL_SIZE = 200;
+ final static private int DISPATCHER_THREAD_POOL_SIZE = 200;
final private ExecutorService dispatcherExecutor =
Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
@@ -262,11 +266,13 @@
this.block = block;
if ( chooseProxySource() ) {
addToMoved(block);
- LOG.info("Decided to move block "+ block.getBlockId()
- +" with a length of "+FsShell.byteDesc(block.getNumBytes())
- + " bytes from " + source.getName()
- + " to " + target.getName()
- + " using proxy source " + proxySource.getName() );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Decided to move block "+ block.getBlockId()
+ +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+ + " bytes from " + source.getName()
+ + " to " + target.getName()
+ + " using proxy source " + proxySource.getName() );
+ }
return true;
}
}
@@ -306,11 +312,9 @@
DataOutputStream out = null;
DataInputStream in = null;
try {
- sock.connect(DataNode.createSocketAddr(
- proxySource.datanode.getName()), HdfsConstants.READ_TIMEOUT);
- long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec",
1024L*1024);
- sock.setSoTimeout(2*HdfsConstants.READ_TIMEOUT+
- (int)(block.getNumBytes()*1500/bandwidth));
+ sock.connect(NetUtils.createSocketAddr(
+ target.datanode.getName()), HdfsConstants.READ_TIMEOUT);
+ sock.setKeepAlive(true);
out = new DataOutputStream( new BufferedOutputStream(
sock.getOutputStream(), FSConstants.BUFFER_SIZE));
sendRequest(out);
@@ -318,25 +322,17 @@
sock.getInputStream(), FSConstants.BUFFER_SIZE));
receiveResponse(in);
bytesMoved.inc(block.getNumBytes());
- if (LOG.isDebugEnabled()) {
- LOG.debug( "Moving block " + block.getBlock().getBlockId() +
+ LOG.info( "Moving block " + block.getBlock().getBlockId() +
" from "+ source.getName() + " to " +
target.getName() + " through " +
proxySource.getName() +
- " succeeded." );
- }
- } catch (SocketTimeoutException te) {
- LOG.warn("Timeout moving block "+block.getBlockId()+
- " from " + source.getName() + " to " +
- target.getName() + " through " +
- proxySource.getName());
+ " is succeeded." );
} catch (IOException e) {
LOG.warn("Error moving block "+block.getBlockId()+
" from " + source.getName() + " to " +
target.getName() + " through " +
proxySource.getName() +
- ": "+e.getMessage()+ "\n" +
- StringUtils.stringifyException(e) );
+ ": "+e.getMessage());
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
@@ -356,14 +352,14 @@
}
}
- /* Send a block copy request to the outputstream*/
+ /* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out) throws IOException {
out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.writeByte(DataTransferProtocol.OP_COPY_BLOCK);
+ out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
out.writeLong(block.getBlock().getBlockId());
out.writeLong(block.getBlock().getGenerationStamp());
Text.writeString(out, source.getStorageID());
- target.write(out);
+ proxySource.write(out);
out.flush();
}
@@ -371,11 +367,7 @@
private void receiveResponse(DataInputStream in) throws IOException {
short status = in.readShort();
if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
- throw new IOException("Moving block "+block.getBlockId()+
- " from "+source.getName() + " to " +
- target.getName() + " through " +
- proxySource.getName() +
- "failed");
+ throw new IOException("block move is failed");
}
}
@@ -391,8 +383,10 @@
private void scheduleBlockMove() {
moverExecutor.execute(new Runnable() {
public void run() {
- LOG.info("Starting moving "+ block.getBlockId() +
- " from " + proxySource.getName() + " to " + target.getName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting moving "+ block.getBlockId() +
+ " from " + proxySource.getName() + " to " + target.getName());
+ }
dispatch();
}
});
@@ -482,8 +476,6 @@
/* A class that keeps track of a datanode in Balancer */
private static class BalancerDatanode implements Writable {
final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
- final protected static short MAX_NUM_CONCURRENT_MOVES =
- DataNode.MAX_BALANCING_THREADS;
protected DatanodeInfo datanode;
private double utilization;
protected long maxSizeToMove;
@@ -920,6 +912,9 @@
// compute average utilization
long totalCapacity=0L, totalUsedSpace=0L;
for (DatanodeInfo datanode : datanodes) {
+ if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+ continue; // ignore decommissioning or decommissioned nodes
+ }
totalCapacity += datanode.getCapacity();
totalUsedSpace += datanode.getDfsUsed();
}
@@ -933,6 +928,9 @@
long overLoadedBytes = 0L, underLoadedBytes = 0L;
shuffleArray(datanodes);
for (DatanodeInfo datanode : datanodes) {
+ if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+ continue; // ignore decommissioning or decommissioned nodes
+ }
cluster.add(datanode);
BalancerDatanode datanodeS;
if (getUtilization(datanode) > avgUtilization) {
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Thu Sep 25 12:06:01 2008
@@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -182,12 +181,6 @@
private static final Random R = new Random();
- // The following three fields are to support balancing
- public final static short MAX_BALANCING_THREADS = 5;
- protected Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
- long balanceBandwidth;
- protected BlockTransferThrottler balancingThrottler;
-
// For InterDataNodeProtocol
public Server ipcServer;
@@ -328,11 +321,6 @@
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval",
HEARTBEAT_INTERVAL) * 1000L;
DataNode.nameNodeAddr = nameNodeAddr;
- //set up parameter for cluster balancing
- this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec",
1024L*1024);
- LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
- this.balancingThrottler = new BlockTransferThrottler(balanceBandwidth);
-
//initialize periodic block scanner
String reason = null;
if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Thu Sep 25 12:06:01 2008
@@ -475,69 +475,50 @@
// Read in the header
long blockId = in.readLong(); // read block id
Block block = new Block(blockId, 0, in.readLong());
- String source = Text.readString(in); // read del hint
- DatanodeInfo target = new DatanodeInfo(); // read target
- target.readFields(in);
- Socket targetSock = null;
- short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
+ if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
+ LOG.info("Not able to copy block " + blockId + " to "
+ + s.getRemoteSocketAddress() + " because threads quota is
exceeded.");
+ return;
+ }
+
BlockSender blockSender = null;
- DataOutputStream targetOut = null;
+ DataOutputStream reply = null;
+ boolean isOpSuccess = true;
+
try {
- datanode.balancingSem.acquireUninterruptibly();
-
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false,
datanode);
- // get the output stream to the target
- InetSocketAddress targetAddr = NetUtils.createSocketAddr(
- target.getName());
- targetSock = datanode.newSocket();
- targetSock.connect(targetAddr, datanode.socketTimeout);
- targetSock.setSoTimeout(datanode.socketTimeout);
-
- OutputStream baseStream = NetUtils.getOutputStream(targetSock,
- datanode.socketWriteTimeout);
- targetOut = new DataOutputStream(
- new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-
- /* send request to the target */
- // fist write header info
- targetOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); //
transfer version
- targetOut.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); // op code
- targetOut.writeLong(block.getBlockId()); // block id
- targetOut.writeLong(block.getGenerationStamp()); // block id
- Text.writeString( targetOut, source); // del hint
-
- // then send data
- long read = blockSender.sendBlock(targetOut, baseStream,
- datanode.balancingThrottler);
+ // set up response stream
+ OutputStream baseStream = NetUtils.getOutputStream(
+ s, datanode.socketWriteTimeout);
+ reply = new DataOutputStream(new BufferedOutputStream(
+ baseStream, SMALL_BUFFER_SIZE));
+
+ // send block content to the target
+ long read = blockSender.sendBlock(reply, baseStream,
+ dataXceiverServer.balanceThrottler);
datanode.myMetrics.bytesRead.inc((int) read);
datanode.myMetrics.blocksRead.inc();
- // check the response from target
- receiveResponse(targetSock, 1);
-
- LOG.info("Copied block " + block + " to " + targetAddr);
+ LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
- opStatus = DataTransferProtocol.OP_STATUS_ERROR;
- LOG.warn("Got exception while serving " + block + " to "
- + target.getName() + ": " + StringUtils.stringifyException(ioe));
+ isOpSuccess = false;
throw ioe;
} finally {
- /* send response to the requester */
- try {
- sendResponse(s, opStatus, datanode.socketWriteTimeout);
- } catch (IOException replyE) {
- LOG.warn("Error writing the response back to "+
- s.getRemoteSocketAddress() + "\n" +
- StringUtils.stringifyException(replyE) );
+ dataXceiverServer.balanceThrottler.release();
+ if (isOpSuccess) {
+ try {
+ // send one last byte to indicate that the resource is cleaned.
+ reply.writeChar('d');
+ } catch (IOException ignored) {
+ }
}
- IOUtils.closeStream(targetOut);
+ IOUtils.closeStream(reply);
IOUtils.closeStream(blockSender);
- datanode.balancingSem.release();
}
}
@@ -549,68 +530,95 @@
* @throws IOException
*/
private void replaceBlock(DataInputStream in) throws IOException {
- datanode.balancingSem.acquireUninterruptibly();
-
/* read header */
- Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize,
- in.readLong()); // block id & len
- String sourceID = Text.readString(in);
+ long blockId = in.readLong();
+ Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
+ in.readLong()); // block id & generation stamp
+ String sourceID = Text.readString(in); // read del hint
+ DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
+ proxySource.readFields(in);
+
+ if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
+ LOG.warn("Not able to receive block " + blockId + " from "
+ + s.getRemoteSocketAddress() + " because threads quota is
exceeded.");
+ sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
+ datanode.socketWriteTimeout);
+ return;
+ }
+ Socket proxySock = null;
+ DataOutputStream proxyOut = null;
short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
BlockReceiver blockReceiver = null;
+ DataInputStream proxyReply = null;
+
try {
+ // get the output stream to the proxy
+ InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
+ proxySource.getName());
+ proxySock = datanode.newSocket();
+ proxySock.connect(proxyAddr, datanode.socketTimeout);
+ proxySock.setSoTimeout(datanode.socketTimeout);
+
+ OutputStream baseStream = NetUtils.getOutputStream(proxySock,
+ datanode.socketWriteTimeout);
+ proxyOut = new DataOutputStream(
+ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+
+ /* send request to the proxy */
+ proxyOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); //
transfer version
+ proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
+ proxyOut.writeLong(block.getBlockId()); // block id
+ proxyOut.writeLong(block.getGenerationStamp()); // block id
+ proxyOut.flush();
+
+ // receive the response from the proxy
+ proxyReply = new DataInputStream(new BufferedInputStream(
+ NetUtils.getInputStream(proxySock), BUFFER_SIZE));
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(
- block, in, s.getRemoteSocketAddress().toString(),
- s.getLocalSocketAddress().toString(), false, "", null, datanode);
+ blockReceiver = new BlockReceiver(
+ block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+ proxySock.getLocalSocketAddress().toString(),
+ false, "", null, datanode);
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
- datanode.balancingThrottler, -1);
+ dataXceiverServer.balanceThrottler, -1);
// notify name node
datanode.notifyNamenodeReceivedBlock(block, sourceID);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
+
} catch (IOException ioe) {
opStatus = DataTransferProtocol.OP_STATUS_ERROR;
throw ioe;
} finally {
+ // receive the last byte that indicates the proxy released its thread
resource
+ if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
+ try {
+ proxyReply.readChar();
+ } catch (IOException ignored) {
+ }
+ }
+
+ // now release the thread resource
+ dataXceiverServer.balanceThrottler.release();
+
// send response back
try {
sendResponse(s, opStatus, datanode.socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
+ IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
- datanode.balancingSem.release();
+ IOUtils.closeStream(proxyReply);
}
}
/**
- * Utility function for receiving a response.
- * @param s socket to read from
- * @param numTargets number of responses to read
- **/
- private void receiveResponse(Socket s, int numTargets) throws IOException {
- // check the response
- DataInputStream reply = new DataInputStream(new BufferedInputStream(
- NetUtils.getInputStream(s), BUFFER_SIZE));
- try {
- for (int i = 0; i < numTargets; i++) {
- short opStatus = reply.readShort();
- if(opStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
- throw new IOException("operation failed at "+
- s.getInetAddress());
- }
- }
- } finally {
- IOUtils.closeStream(reply);
- }
- }
-
- /**
* Utility function for sending a response.
* @param s socket to write to
* @param opStatus status message to write
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
Thu Sep 25 12:06:01 2008
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
@@ -54,6 +55,45 @@
static final int MAX_XCEIVER_COUNT = 256;
int maxXceiverCount = MAX_XCEIVER_COUNT;
+ /** A manager to make sure that cluster balancing does not
+ * take too much resources.
+ *
+ * It limits the number of block moves for balancing and
+ * the total amount of bandwidth they can use.
+ */
+ static class BlockBalanceThrottler extends BlockTransferThrottler {
+ private int numThreads;
+
+ /**Constructor
+ *
+ * @param bandwidth Total amount of bandwidth can be used for balancing
+ */
+ private BlockBalanceThrottler(long bandwidth) {
+ super(bandwidth);
+ LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+ }
+
+ /** Check if the block move can start.
+ *
+ * Return true if the thread quota is not exceeded and
+ * the counter is incremented; False otherwise.
+ */
+ synchronized boolean acquire() {
+ if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+ return false;
+ }
+ numThreads++;
+ return true;
+ }
+
+ /** Mark that the move is completed. The thread counter is decremented. */
+ synchronized void release() {
+ numThreads--;
+ }
+ }
+
+ BlockBalanceThrottler balanceThrottler;
+
/**
* We need an estimate for block size to check if the disk partition has
* enough space. For now we set it to be the default block size set
@@ -75,6 +115,10 @@
MAX_XCEIVER_COUNT);
this.estimateBlockSize = conf.getLong("dfs.block.size",
DEFAULT_BLOCK_SIZE);
+
+ //set up parameter for cluster balancing
+ this.balanceThrottler = new BlockBalanceThrottler(
+ conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
}
/**
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=699056&r1=699055&r2=699056&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
Thu Sep 25 12:06:01 2008
@@ -213,7 +213,7 @@
}
/* Copy a block from sourceProxy to destination. If the block becomes
- * overreplicated, preferrably remove it from source.
+ * over-replicated, preferably remove it from source.
*
* Return true if a block is successfully copied; otherwise false.
*/
@@ -221,16 +221,16 @@
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
Socket sock = new Socket();
sock.connect(NetUtils.createSocketAddr(
- sourceProxy.getName()), HdfsConstants.READ_TIMEOUT);
- sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+ destination.getName()), HdfsConstants.READ_TIMEOUT);
+ sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.writeByte(DataTransferProtocol.OP_COPY_BLOCK);
+ out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
out.writeLong(block.getBlockId());
out.writeLong(block.getGenerationStamp());
Text.writeString(out, source.getStorageID());
- destination.write(out);
+ sourceProxy.write(out);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());