Author: rangadi
Date: Thu Apr 24 17:10:05 2008
New Revision: 651465
URL: http://svn.apache.org/viewvc?rev=651465&view=rev
Log:
HADOOP-3164. Reduce DataNode CPU usage by using FileChannel.tranferTo().
On Linux DataNode takes 5 times less CPU while serving data. Results may
vary on other platforms.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 24 17:10:05 2008
@@ -49,6 +49,10 @@
HADOOP-1979. Speed up fsck by adding a buffered stream. (Lohit
Vijaya Renu via omalley)
+ HADOOP-3164. Reduce DataNode CPU usage by using FileChannel.tranferTo().
+ On Linux DataNode takes 5 times less CPU while serving data. Results may
+ vary on other platforms.
+
BUG FIXES
HADOOP-2905. 'fsck -move' triggers NPE in NameNode.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java Thu
Apr 24 17:10:05 2008
@@ -382,7 +382,7 @@
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
- blockSender.sendBlock(out, throttler);
+ blockSender.sendBlock(out, null, throttler);
LOG.info((second ? "Second " : "") +
"Verification succeeded for " + block);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Apr 24
17:10:05 2008
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -39,6 +40,7 @@
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
@@ -89,6 +91,13 @@
return NetUtils.createSocketAddr(target);
}
+ /**
+ * Minimum buffer used while sending data to clients. Used only if
+ * transferTo() is enabled. 64KB is not that large. It could be larger, but
+ * not sure if there will be much more improvement.
+ */
+ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+
DatanodeProtocol namenode = null;
FSDatasetInterface data = null;
DatanodeRegistration dnRegistration = null;
@@ -120,6 +129,7 @@
int defaultBytesPerChecksum = 512;
private int socketTimeout;
private int socketWriteTimeout = 0;
+ private boolean transferToAllowed = true;
private DataBlockScanner blockScanner;
private Daemon blockScannerThread;
@@ -208,7 +218,10 @@
FSConstants.READ_TIMEOUT);
this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
FSConstants.WRITE_TIMEOUT);
-
+ /* Based on results on different platforms, we might need set the default
+ * to false on some of them. */
+ this.transferToAllowed =
conf.getBoolean("dfs.datanode.transferTo.allowed",
+ true);
String address =
NetUtils.getServerAddress(conf,
"dfs.datanode.bindAddress",
@@ -1022,8 +1035,9 @@
long length = in.readLong();
// send the block
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(s, socketWriteTimeout), SMALL_BUFFER_SIZE));
+ OutputStream baseStream = NetUtils.getOutputStream(s,socketWriteTimeout);
+ DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
BlockSender blockSender = null;
try {
@@ -1036,7 +1050,7 @@
}
out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
- long read = blockSender.sendBlock(out, null); // send data
+ long read = blockSender.sendBlock(out, baseStream, null); // send data
if (blockSender.isBlockReadFully()) {
// See if client verification succeeded.
@@ -1306,9 +1320,10 @@
targetSock.connect(targetAddr, socketTimeout);
targetSock.setSoTimeout(socketTimeout);
- targetOut = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(targetSock, socketWriteTimeout),
- SMALL_BUFFER_SIZE));
+ OutputStream baseStream = NetUtils.getOutputStream(targetSock,
+
socketWriteTimeout);
+ targetOut = new DataOutputStream(
+ new BufferedOutputStream(baseStream,
SMALL_BUFFER_SIZE));
/* send request to the target */
// fist write header info
@@ -1318,7 +1333,8 @@
Text.writeString( targetOut, source); // del hint
// then send data
- long read = blockSender.sendBlock(targetOut, balancingThrottler);
+ long read = blockSender.sendBlock(targetOut, baseStream,
+ balancingThrottler);
myMetrics.bytesRead.inc((int) read);
myMetrics.blocksRead.inc();
@@ -1567,6 +1583,7 @@
class BlockSender implements java.io.Closeable {
private Block block; // the block to read from
private InputStream blockIn; // data stream
+ private long blockInPosition = -1; // updated while using transferTo().
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
@@ -1581,7 +1598,6 @@
private boolean blockReadFully; //set when the whole block is read
private boolean verifyChecksum; //if true, check is verified while reading
private Throttler throttler;
- private OutputStream out;
static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
8 + /* offset in block */
@@ -1705,8 +1721,14 @@
/**
* Sends upto maxChunks chunks of data.
+ *
+ * When blockInPosition is >= 0, assumes 'out' is a
+ * [EMAIL PROTECTED] SocketOutputStream} and tries
+ * [EMAIL PROTECTED] SocketOutputStream#transferToFully(FileChannel, long,
int)} to
+ * send data (and updates blockInPosition).
*/
- private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException {
+ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
+ throws IOException {
// Sends multiple chunks in one packet with a single write().
int len = Math.min((int) (endOffset - offset),
@@ -1750,28 +1772,44 @@
}
int dataOff = checksumOff + checksumLen;
- IOUtils.readFully(blockIn, buf, dataOff, len);
- if (verifyChecksum) {
- int dOff = dataOff;
- int cOff = checksumOff;
- int dLeft = len;
+ if (blockInPosition >= 0) {
+ //use transferTo(). Checks on out and blockIn are already done.
- for (int i=0; i<numChunks; i++) {
- checksum.reset();
- int dLen = Math.min(dLeft, bytesPerChecksum);
- checksum.update(buf, dOff, dLen);
- if (!checksum.compare(buf, cOff)) {
- throw new ChecksumException("Checksum failed at " +
- (offset + len - dLeft), len);
+ SocketOutputStream sockOut = (SocketOutputStream)out;
+ //first write the packet
+ sockOut.write(buf, 0, dataOff);
+ // no need to flush. since we know out is not a buffered stream.
+
+ sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
+ blockInPosition, len);
+
+ blockInPosition += len;
+ } else {
+ //normal transfer
+ IOUtils.readFully(blockIn, buf, dataOff, len);
+
+ if (verifyChecksum) {
+ int dOff = dataOff;
+ int cOff = checksumOff;
+ int dLeft = len;
+
+ for (int i=0; i<numChunks; i++) {
+ checksum.reset();
+ int dLen = Math.min(dLeft, bytesPerChecksum);
+ checksum.update(buf, dOff, dLen);
+ if (!checksum.compare(buf, cOff)) {
+ throw new ChecksumException("Checksum failed at " +
+ (offset + len - dLeft), len);
+ }
+ dLeft -= dLen;
+ dOff += dLen;
+ cOff += checksumSize;
}
- dLeft -= dLen;
- dOff += dLen;
- cOff += checksumSize;
}
- }
- out.write(buf, 0, dataOff + len);
+ out.write(buf, 0, dataOff + len);
+ }
if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
@@ -1785,32 +1823,64 @@
* either a client or to another datanode.
*
* @param out stream to which the block is written to
- * returns total bytes reads, including crc.
+ * @param baseStream optional. if non-null, <code>out</code> is assumed to
+ * be a wrapper over this stream. This enables optimizations for
+ * sending the data, e.g.
+ * [EMAIL PROTECTED]
SocketOutputStream#transferToFully(FileChannel,
+ * long, int)}.
+ * @param throttler for sending data.
+ * @return total bytes reads, including crc.
*/
- long sendBlock(DataOutputStream out, Throttler throttler)
- throws IOException {
+ long sendBlock(DataOutputStream out, OutputStream baseStream,
+ Throttler throttler) throws IOException {
if( out == null ) {
throw new IOException( "out stream is null" );
}
- this.out = out;
this.throttler = throttler;
long initialOffset = offset;
long totalRead = 0;
+ OutputStream streamForSendChunks = out;
+
try {
checksum.writeHeader(out);
if ( chunkOffsetOK ) {
out.writeLong( offset );
}
- //set up sendBuf:
- int maxChunksPerPacket = Math.max(1,
- (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
- ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN +
- (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
+ out.flush();
+
+ int maxChunksPerPacket;
+ int pktSize;
+
+ if (transferToAllowed && !verifyChecksum &&
+ baseStream instanceof SocketOutputStream &&
+ blockIn instanceof FileInputStream) {
+
+ FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+
+ // blockInPosition also indicates sendChunks() uses transferTo.
+ blockInPosition = fileChannel.position();
+ streamForSendChunks = baseStream;
+
+ // assure a mininum buffer size.
+ maxChunksPerPacket = (Math.max(BUFFER_SIZE,
+ MIN_BUFFER_WITH_TRANSFERTO)
+ + bytesPerChecksum - 1)/bytesPerChecksum;
+
+ // allocate smaller buffer while using transferTo().
+ pktSize = PKT_HEADER_LEN + checksumSize * maxChunksPerPacket;
+ } else {
+ maxChunksPerPacket = Math.max(1,
+ (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+ pktSize = PKT_HEADER_LEN +
+ (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+ }
+ ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
- long len = sendChunks(pktBuf, maxChunksPerPacket);
+ long len = sendChunks(pktBuf, maxChunksPerPacket,
+ streamForSendChunks);
offset += len;
totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
checksumSize);
@@ -2606,8 +2676,9 @@
long writeTimeout = socketWriteTimeout +
WRITE_TIMEOUT_EXTENSION * (targets.length-1);
- out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
+ OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
+ out = new DataOutputStream(new BufferedOutputStream(baseStream,
+
SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, -1, false, false, false);
@@ -2626,7 +2697,7 @@
targets[i].write(out);
}
// send data & checksum
- blockSender.sendBlock(out, null);
+ blockSender.sendBlock(out, baseStream, null);
// no response necessary
LOG.info(dnRegistration + ":Transmitted block " + b + " to " +
curTarget);
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
Thu Apr 24 17:10:05 2008
@@ -159,22 +159,45 @@
}
if (count == 0) {
-
- String waitingFor = ""+ops;
- if (ops == SelectionKey.OP_READ) {
- waitingFor = "read";
- } else if (ops == SelectionKey.OP_WRITE) {
- waitingFor = "write";
- }
-
- throw new SocketTimeoutException(timeout + " millis timeout while " +
- "waiting for channel to be ready for "
- + waitingFor + ". ch : " + channel);
+ throw new SocketTimeoutException(timeoutExceptionString(ops));
}
// otherwise the socket should be ready for io.
}
return 0; // does not reach here.
+ }
+
+ /**
+ * This is similar to [EMAIL PROTECTED] #doIO(ByteBuffer, int)} except that
it
+ * does not perform any I/O. It just waits for the channel to be ready
+ * for I/O as specified in ops.
+ *
+ * @param ops Selection Ops used for waiting
+ *
+ * @throws SocketTimeoutException
+ * if select on the channel times out.
+ * @throws IOException
+ * if any other I/O error occurs.
+ */
+ void waitForIO(int ops) throws IOException {
+
+ if (selector.select(channel, ops, timeout) == 0) {
+ throw new SocketTimeoutException(timeoutExceptionString(ops));
+ }
+ }
+
+ private String timeoutExceptionString(int ops) {
+
+ String waitingFor = "" + ops;
+ if (ops == SelectionKey.OP_READ) {
+ waitingFor = "read";
+ } else if (ops == SelectionKey.OP_WRITE) {
+ waitingFor = "write";
+ }
+
+ return timeout + " millis timeout while " +
+ "waiting for channel to be ready for " +
+ waitingFor + ". ch : " + channel;
}
/**
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java Thu
Apr 24 17:10:05 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
@@ -147,5 +148,18 @@
public int read(ByteBuffer dst) throws IOException {
return reader.doIO(dst, SelectionKey.OP_READ);
+ }
+
+ /**
+ * waits for the underlying channel to be ready for reading.
+ * The timeout specified for this stream applies to this wait.
+ *
+ * @throws SocketTimeoutException
+ * if select on the channel times out.
+ * @throws IOException
+ * if any other I/O error occurs.
+ */
+ public void waitForReadable() throws IOException {
+ reader.waitForIO(SelectionKey.OP_READ);
}
}
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
Thu Apr 24 17:10:05 2008
@@ -18,9 +18,11 @@
package org.apache.hadoop.net;
+import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectableChannel;
@@ -143,4 +145,75 @@
public int write(ByteBuffer src) throws IOException {
return writer.doIO(src, SelectionKey.OP_WRITE);
}
+
+ /**
+ * waits for the underlying channel to be ready for writing.
+ * The timeout specified for this stream applies to this wait.
+ *
+ * @throws SocketTimeoutException
+ * if select on the channel times out.
+ * @throws IOException
+ * if any other I/O error occurs.
+ */
+ public void waitForWritable() throws IOException {
+ writer.waitForIO(SelectionKey.OP_WRITE);
+ }
+
+ /**
+ * Transfers data from FileChannel using
+ * [EMAIL PROTECTED] FileChannel#transferTo(long, long,
WritableByteChannel)}.
+ *
+ * Similar to readFully(), this waits till requested amount of
+ * data is transfered.
+ *
+ * @param fileCh FileChannel to transfer data from.
+ * @param position position within the channel where the transfer begins
+ * @param count number of bytes to transfer.
+ *
+ * @throws EOFException
+ * If end of input file is reached before requested number of
+ * bytes are transfered.
+ *
+ * @throws SocketTimeoutException
+ * If this channel blocks transfer longer than timeout for
+ * this stream.
+ *
+ * @throws IOException Includes any exception thrown by
+ * [EMAIL PROTECTED] FileChannel#transferTo(long, long,
WritableByteChannel)}.
+ */
+ public void transferToFully(FileChannel fileCh, long position, int count)
+ throws IOException {
+
+ while (count > 0) {
+ /*
+ * Ideally we should wait after transferTo returns 0. But because of
+ * a bug in JRE on Linux
(http://bugs.sun.com/view_bug.do?bug_id=5103988),
+ * which throws an exception instead of returning 0, we wait for the
+ * channel to be writable before writing to it. If you ever see
+ * IOException with message "Resource temporarily unavailable"
+ * thrown here, please let us know.
+ *
+ * Once we move to JAVA SE 7, wait should be moved to correct place.
+ */
+ waitForWritable();
+ int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
+
+ if (nTransfered == 0) {
+ //check if end of file is reached.
+ if (position >= fileCh.size()) {
+ throw new EOFException("EOF Reached. file size is " + fileCh.size()
+
+ " and " + count + " more bytes left to be " +
+ "transfered.");
+ }
+ //otherwise assume the socket is full.
+ //waitForWritable(); // see comment above.
+ } else if (nTransfered < 0) {
+ throw new IOException("Unexpected return of " + nTransfered +
+ " from transferTo()");
+ } else {
+ position += nTransfered;
+ count -= nTransfered;
+ }
+ }
+ }
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPread.java?rev=651465&r1=651464&r2=651465&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPread.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPread.java Thu Apr 24
17:10:05 2008
@@ -161,11 +161,19 @@
* Tests positional read in DFS.
*/
public void testPreadDFS() throws IOException {
+ dfsPreadTest(false); //normal pread
+ dfsPreadTest(true); //trigger read code path without transferTo.
+ }
+
+ private void dfsPreadTest(boolean disableTransferTo) throws IOException {
Configuration conf = new Configuration();
conf.setLong("dfs.block.size", 4096);
conf.setLong("dfs.read.prefetch.size", 4096);
if (simulatedStorage) {
conf.setBoolean("dfs.datanode.simulateddatastorage", true);
+ }
+ if (disableTransferTo) {
+ conf.setBoolean("dfs.datanode.transferTo.allowed", false);
}
MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fileSys = cluster.getFileSystem();