Author: rangadi
Date: Tue May 13 23:32:42 2008
New Revision: 656118
URL: http://svn.apache.org/viewvc?rev=656118&view=rev
Log:
HADOOP-1702. Reduce buffer copies when data is written to DFS.
DataNodes take 30% less CPU while writing data. (rangadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656118&r1=656117&r2=656118&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 13 23:32:42 2008
@@ -140,6 +140,9 @@
HADOOP-3369. Fast block processing during name-node startup. (shv)
+ HADOOP-1702. Reduce buffer copies when data is written to DFS.
+ DataNodes take 30% less CPU while writing data. (rangadi)
+
BUG FIXES
HADOOP-2905. 'fsck -move' triggers NPE in NameNode.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=656118&r1=656117&r2=656118&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue May 13
23:32:42 2008
@@ -39,6 +39,7 @@
import java.util.zip.CRC32;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
@@ -71,6 +72,7 @@
private short defaultReplication;
private SocketFactory socketFactory;
private int socketTimeout;
+ final int writePacketSize;
private FileSystem.Statistics stats;
/**
@@ -144,6 +146,8 @@
this.socketTimeout = conf.getInt("dfs.socket.timeout",
FSConstants.READ_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
+ // dfs.write.packet.size is an internal config variable
+ this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
try {
this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -1683,7 +1687,6 @@
private DataInputStream blockReplyStream;
private Block block;
private long blockSize;
- private int buffersize;
private DataChecksum checksum;
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@@ -1694,10 +1697,8 @@
private ResponseProcessor response = null;
private long currentSeqno = 0;
private long bytesCurBlock = 0; // bytes writen in current block
- private int packetSize = 0;
+ private int packetSize = 0; // write packet size, including the header.
private int chunksPerPacket = 0;
- private int chunksPerBlock = 0;
- private int chunkSize = 0;
private DatanodeInfo[] nodes = null; // list of targets for current block
private volatile boolean hasError = false;
private volatile int errorIndex = 0;
@@ -1707,56 +1708,95 @@
private boolean persistBlocks = false; // persist blocks on namenode
private class Packet {
- ByteBuffer buffer;
+ ByteBuffer buffer; // only one of buf and buffer is non-null
+ byte[] buf;
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
- int flushOffsetBuffer; // last full chunk that was flushed
- long flushOffsetBlock; // block offset of last full chunk flushed
+ int dataStart;
+ int dataPos;
+ int checksumStart;
+ int checksumPos;
// create a new packet
- Packet(int size, long offsetInBlock) {
- buffer = ByteBuffer.allocate(size);
- buffer.clear();
+ Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
this.seqno = currentSeqno;
- this.flushOffsetBuffer = 0;
- this.flushOffsetBlock = 0;
currentSeqno++;
+
+ buffer = null;
+ buf = new byte[pktSize];
+
+ checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ checksumPos = checksumStart;
+ dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+ dataPos = dataStart;
}
- // create a new Packet with the contents copied from the
- // specified one. Shares the same buffer.
- Packet(Packet old) {
- this.buffer = old.buffer;
- this.lastPacketInBlock = old.lastPacketInBlock;
- this.numChunks = old.numChunks;
- this.offsetInBlock = old.offsetInBlock;
- this.seqno = old.seqno;
- this.flushOffsetBuffer = old.flushOffsetBuffer;
- this.flushOffsetBlock = old.flushOffsetBlock;
- }
-
- // writes len bytes from offset off in inarray into
- // this packet.
- //
- void write(byte[] inarray, int off, int len) {
- buffer.put(inarray, off, len);
+ void writeData(byte[] inarray, int off, int len) {
+ if ( dataPos + len > buf.length) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, dataPos, len);
+ dataPos += len;
}
- // writes an integer into this packet.
- //
- void writeInt(int value) {
- buffer.putInt(value);
- }
-
- // sets the last flush offset of this packet.
- void setFlushOffset(int bufoff, long blockOff) {
- this.flushOffsetBuffer = bufoff;;
- this.flushOffsetBlock = blockOff;
+ void writeChecksum(byte[] inarray, int off, int len) {
+ if (checksumPos + len > dataStart) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, checksumPos, len);
+ checksumPos += len;
+ }
+
+ /**
+ * Returns ByteBuffer that contains one full packet, including header.
+ */
+ ByteBuffer getBuffer() {
+ /* Once this is called, no more data can be added to the packet.
+ * setting 'buf' to null ensures that.
+ * This is called only when the packet is ready to be sent.
+ */
+ if (buffer != null) {
+ return buffer;
+ }
+
+ //prepare the header and close any gap between checksum and data.
+
+ int dataLen = dataPos - dataStart;
+ int checksumLen = checksumPos - checksumStart;
+
+ if (checksumPos != dataStart) {
+ /* move the checksum to cover the gap.
+ * This can happen for the last packet.
+ */
+ System.arraycopy(buf, checksumStart, buf,
+ dataStart - checksumLen , checksumLen);
+ }
+
+ int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;
+
+ //normally dataStart == checksumPos, i.e., offset is zero.
+ buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
+ DataNode.PKT_HEADER_LEN + pktLen);
+ buf = null;
+ buffer.mark();
+
+ /* write the header and data length.
+ * The format is described in comment before DataNode.BlockSender
+ */
+ buffer.putInt(pktLen); // pktSize
+ buffer.putLong(offsetInBlock);
+ buffer.putLong(seqno);
+ buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
+ //end of pkt header
+ buffer.putInt(dataLen); // actual data length, excluding checksum.
+
+ buffer.reset();
+ return buffer;
}
}
@@ -1807,8 +1847,6 @@
try {
// get packet to be sent.
one = dataQueue.getFirst();
- int start = 0;
- int len = one.buffer.limit();
long offsetInBlock = one.offsetInBlock;
// get new block from namenode.
@@ -1821,16 +1859,6 @@
response.start();
}
- // If we are sending a sub-packet, then determine the offset
- // in block.
- if (one.flushOffsetBuffer != 0) {
- offsetInBlock += one.flushOffsetBlock;
- len = len - one.flushOffsetBuffer;
- start += one.flushOffsetBuffer;
- }
-
- // user bytes from 'position' to 'limit'.
- byte[] arr = one.buffer.array();
if (offsetInBlock >= blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
@@ -1839,6 +1867,8 @@
" Aborting file " + src);
}
+ ByteBuffer buf = one.getBuffer();
+
// move packet from dataQueue to ackQueue
dataQueue.removeFirst();
dataQueue.notifyAll();
@@ -1846,22 +1876,21 @@
ackQueue.addLast(one);
ackQueue.notifyAll();
}
-
+
// write out data to remote datanode
- blockStream.writeInt(len); // size of this packet
- blockStream.writeLong(offsetInBlock); // data offset in block
- blockStream.writeLong(one.seqno); // sequence num of packet
- blockStream.writeBoolean(one.lastPacketInBlock);
- blockStream.write(arr, start, len);
+ blockStream.write(buf.array(), buf.position(), buf.remaining());
+
if (one.lastPacketInBlock) {
blockStream.writeInt(0); // indicate end-of-block
}
blockStream.flush();
- LOG.debug("DataStreamer block " + block +
- " wrote packet seqno:" + one.seqno +
- " size:" + len +
- " offsetInBlock:" + offsetInBlock +
- " lastPacketInBlock:" + one.lastPacketInBlock);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DataStreamer block " + block +
+ " wrote packet seqno:" + one.seqno +
+ " size:" + buf.remaining() +
+ " offsetInBlock:" + one.offsetInBlock +
+ " lastPacketInBlock:" + one.lastPacketInBlock);
+ }
} catch (IOException e) {
LOG.warn("DataStreamer Exception: " + e);
hasError = true;
@@ -2138,7 +2167,6 @@
super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
this.src = src;
this.blockSize = blockSize;
- this.buffersize = buffersize;
this.progress = progress;
if (progress != null) {
LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -2154,11 +2182,11 @@
}
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
bytesPerChecksum);
- // A maximum of 128 chunks per packet, i.e. 64K packet size.
- chunkSize = bytesPerChecksum + 2 * SIZE_OF_INTEGER; // user data &
checksum
- chunksPerBlock = (int)(blockSize / bytesPerChecksum);
- chunksPerPacket = Math.min(chunksPerBlock, 128);
- packetSize = chunkSize * chunksPerPacket;
+ int chunkSize = bytesPerChecksum + checksum.getChecksumSize();
+ chunksPerPacket = Math.max((writePacketSize - DataNode.PKT_HEADER_LEN -
+ SIZE_OF_INTEGER + chunkSize-1)/chunkSize, 1);
+ packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+ chunkSize * chunksPerPacket;
try {
namenode.create(
@@ -2254,7 +2282,7 @@
//
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(s,
writeTimeout),
- buffersize));
+ DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
out.writeShort( DATA_TRANSFER_VERSION );
@@ -2351,12 +2379,6 @@
this.checksum.getChecksumSize() +
" but found to be " + checksum.length);
}
- if (len + cklen + SIZE_OF_INTEGER > chunkSize) {
- throw new IOException("writeChunk() found data of size " +
- (len + cklen + 4) +
- " that cannot be larger than chukSize " +
- chunkSize);
- }
synchronized (dataQueue) {
@@ -2370,30 +2392,30 @@
isClosed();
if (currentPacket == null) {
- currentPacket = new Packet(packetSize, bytesCurBlock);
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
LOG.debug("DFSClient writeChunk allocating new packet " +
currentPacket.seqno);
}
- currentPacket.writeInt(len);
- currentPacket.write(checksum, 0, cklen);
- currentPacket.write(b, offset, len);
+ currentPacket.writeChecksum(checksum, 0, cklen);
+ currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
//
if (currentPacket.numChunks == chunksPerPacket ||
- bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+ bytesCurBlock == blockSize) {
LOG.debug("DFSClient writeChunk packet full seqno " +
currentPacket.seqno);
- currentPacket.buffer.flip();
//
// if we allocated a new packet because we encountered a block
// boundary, reset bytesCurBlock.
//
- if (bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+ if (bytesCurBlock == blockSize) {
currentPacket.lastPacketInBlock = true;
bytesCurBlock = 0;
+ lastFlushOffset = -1;
}
dataQueue.addLast(currentPacket);
dataQueue.notifyAll();
@@ -2410,66 +2432,38 @@
* datanode. Block allocations are persisted on namenode.
*/
public synchronized void fsync() throws IOException {
- Packet savePacket = null;
- int position = 0;
- long saveOffset = 0;
-
try {
- // Record the state of the current output stream.
- // This state will be reverted after the flush successfully
- // finishes. It is necessary to do this so that partial
- // checksum chunks are reused by writes that follow this
- // flush.
- if (currentPacket != null) {
- savePacket = new Packet(currentPacket);
- position = savePacket.buffer.position();
- }
- saveOffset = bytesCurBlock;
+ /* Record current blockOffset. This might be changed inside
+ * flushBuffer() where a partial checksum chunk might be flushed.
+ * After the flush, reset the bytesCurBlock back to its previous value,
+ * any partial checksum chunk will be sent now and in next packet.
+ */
+ long saveOffset = bytesCurBlock;
// flush checksum buffer, but keep checksum buffer intact
flushBuffer(true);
- LOG.debug("DFSClient flushInternal save position " +
- position +
- " cur position " +
- ((currentPacket != null) ? currentPacket.buffer.position() :
-1) +
- " limit " +
- ((currentPacket != null) ? currentPacket.buffer.limit() :
-1) +
- " bytesCurBlock " + bytesCurBlock +
- " lastFlushOffset " + lastFlushOffset);
-
- //
- // Detect the condition that we have already flushed all
- // outstanding data.
- //
- boolean skipFlush = (lastFlushOffset == bytesCurBlock &&
- savePacket != null && currentPacket != null &&
- savePacket.seqno == currentPacket.seqno);
+ LOG.debug("DFSClient flush() : saveOffset " + saveOffset +
+ " bytesCurBlock " + bytesCurBlock +
+ " lastFlushOffset " + lastFlushOffset);
- // Do the flush.
- //
- if (!skipFlush) {
+ // Flush only if we haven't already flushed till this offset.
+ if (lastFlushOffset != bytesCurBlock) {
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
// wait for all packets to be sent and acknowledged
flushInternal();
+ } else {
+ // just discard the current packet since it is already been sent.
+ currentPacket = null;
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
bytesCurBlock = saveOffset;
- currentPacket = null;
- if (savePacket != null) {
- savePacket.buffer.limit(savePacket.buffer.capacity());
- savePacket.buffer.position(position);
- savePacket.setFlushOffset(position,
- savePacket.numChunks *
- checksum.getBytesPerChecksum());
- currentPacket = savePacket;
- }
// If any new blocks were allocated since the last flush,
// then persist block locations on namenode.
@@ -2501,7 +2495,6 @@
// If there is data in the current buffer, send it across
//
if (currentPacket != null) {
- currentPacket.buffer.flip();
dataQueue.addLast(currentPacket);
dataQueue.notifyAll();
currentPacket = null;
@@ -2594,12 +2587,11 @@
// packet with empty payload.
synchronized (dataQueue) {
if (currentPacket == null && bytesCurBlock != 0) {
- currentPacket = new Packet(packetSize, bytesCurBlock);
- currentPacket.writeInt(0); // one chunk with empty contents
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
}
if (currentPacket != null) {
currentPacket.lastPacketInBlock = true;
- currentPacket.setFlushOffset(0, 0); // send whole packet
}
}
@@ -2649,7 +2641,9 @@
synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = chunkSize * chunksPerPacket;
+ packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+ (checksum.getBytesPerChecksum() +
+ checksum.getChecksumSize()) * chunksPerPacket;
}
synchronized void setTestFilename(String newname) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=656118&r1=656117&r2=656118&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue May 13
23:32:42 2008
@@ -129,6 +129,7 @@
private int socketTimeout;
private int socketWriteTimeout = 0;
private boolean transferToAllowed = true;
+ private int writePacketSize = 0;
DataBlockScanner blockScanner;
Daemon blockScannerThread;
@@ -221,6 +222,7 @@
* to false on some of them. */
this.transferToAllowed =
conf.getBoolean("dfs.datanode.transferTo.allowed",
true);
+ this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
String address =
NetUtils.getServerAddress(conf,
"dfs.datanode.bindAddress",
@@ -991,7 +993,8 @@
DataInputStream in=null;
try {
in = new DataInputStream(
- new BufferedInputStream(NetUtils.getInputStream(s), BUFFER_SIZE));
+ new BufferedInputStream(NetUtils.getInputStream(s),
+ SMALL_BUFFER_SIZE));
short version = in.readShort();
if ( version != DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
@@ -1174,7 +1177,7 @@
mirrorOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(mirrorSock, writeTimeout),
- BUFFER_SIZE));
+ SMALL_BUFFER_SIZE));
mirrorIn = new
DataInputStream(NetUtils.getInputStream(mirrorSock));
// Write header: Copied from DFSClient.java!
@@ -1603,6 +1606,12 @@
************************************************************************ */
+ /** Header size for a packet */
+ static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
+ 8 + /* offset in block */
+ 8 + /* seqno */
+ 1 /* isLastPacketInBlock */);
+
class BlockSender implements java.io.Closeable {
private Block block; // the block to read from
private InputStream blockIn; // data stream
@@ -1622,12 +1631,6 @@
private boolean verifyChecksum; //if true, check is verified while reading
private Throttler throttler;
- static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
- 8 + /* offset in block */
- 8 + /* seqno */
- 1 + /* isLastPacketInBlock */
- 4 /* data len */ );
-
BlockSender(Block block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum) throws IOException {
@@ -1873,7 +1876,7 @@
out.flush();
int maxChunksPerPacket;
- int pktSize;
+ int pktSize = PKT_HEADER_LEN + SIZE_OF_INTEGER;
if (transferToAllowed && !verifyChecksum &&
baseStream instanceof SocketOutputStream &&
@@ -1891,12 +1894,11 @@
+ bytesPerChecksum - 1)/bytesPerChecksum;
// allocate smaller buffer while using transferTo().
- pktSize = PKT_HEADER_LEN + checksumSize * maxChunksPerPacket;
+ pktSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
(BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
- pktSize = PKT_HEADER_LEN +
- (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+ pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
@@ -2200,39 +2202,6 @@
}
}
- // this class is a bufferoutputstream that exposes the number of
- // bytes in the buffer.
- static private class DFSBufferedOutputStream extends BufferedOutputStream {
- OutputStream out;
- DFSBufferedOutputStream(OutputStream out, int capacity) {
- super(out, capacity);
- this.out = out;
- }
-
- public synchronized void flush() throws IOException {
- super.flush();
- }
-
- /**
- * Returns true if the channel pointer is already set at the
- * specified offset. Otherwise returns false.
- */
- synchronized boolean samePosition(FSDatasetInterface data,
- FSDataset.BlockWriteStreams streams,
- Block block,
- long offset)
- throws IOException {
- if (data.getChannelPosition(block, streams) + count == offset) {
- return true;
- }
- LOG.debug("samePosition is false. " +
- " current position " + data.getChannelPosition(block, streams)+
- " buffered size " + count +
- " new offset " + offset);
- return false;
- }
- }
-
/* A class that receives a block and wites to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
@@ -2242,13 +2211,13 @@
private boolean finalized;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
- private DataOutputStream out = null; // to block file at local disk
+ private OutputStream out = null; // to block file at local disk
private DataOutputStream checksumOut = null; // to crc file at local disk
- private DFSBufferedOutputStream bufStream = null;
private int bytesPerChecksum;
private int checksumSize;
- private byte buf[];
- private byte checksumBuf[];
+ private ByteBuffer buf; // contains one full packet.
+ private int bufRead; //amount of valid data in the buf
+ private int maxPacketReadLen;
private long offsetInBlock;
final private String inAddr;
private String mirrorAddr;
@@ -2272,19 +2241,16 @@
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
- this.buf = new byte[bytesPerChecksum + checksumSize];
- this.checksumBuf = new byte[checksumSize];
//
// Open local disk out
//
streams = data.writeToBlock(block, isRecovery);
this.finalized = data.isValidBlock(block);
if (streams != null) {
- this.bufStream = new DFSBufferedOutputStream(
- streams.dataOut, BUFFER_SIZE);
- this.out = new DataOutputStream(bufStream);
+ this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
- streams.checksumOut, BUFFER_SIZE));
+ streams.checksumOut,
+ SMALL_BUFFER_SIZE));
}
} catch(IOException ioe) {
IOUtils.closeStream(this);
@@ -2351,174 +2317,249 @@
}
}
- /* receive a chunk: write it to disk & mirror it to another stream */
- private void receiveChunk( int len, byte[] checksumBuf, int checksumOff )
- throws IOException {
- if (len <= 0 || len > bytesPerChecksum) {
- throw new IOException("Got wrong length during writeBlock(" + block
- + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
- + " expected <= " + bytesPerChecksum);
- }
+ /**
+ * Verify multiple CRC chunks.
+ */
+ private void verifyChunks( byte[] dataBuf, int dataOff, int len,
+ byte[] checksumBuf, int checksumOff )
+ throws IOException {
+ while (len > 0) {
+ int chunkLen = Math.min(len, bytesPerChecksum);
+
+ checksum.update(dataBuf, dataOff, chunkLen);
- in.readFully(buf, 0, len);
-
- /*
- * Verification is not included in the initial design. For now, it at
- * least catches some bugs. Later, we can include this after showing that
- * it does not affect performance much.
- */
- checksum.update(buf, 0, len);
+ if (!checksum.compare(checksumBuf, checksumOff)) {
+ throw new IOException("Unexpected checksum mismatch " +
+ "while writing " + block + " from " + inAddr);
+ }
- if (!checksum.compare(checksumBuf, checksumOff)) {
- throw new IOException("Unexpected checksum mismatch "
- + "while writing " + block + " from " + inAddr);
+ checksum.reset();
+ dataOff += chunkLen;
+ checksumOff += checksumSize;
+ len -= chunkLen;
}
+ }
- checksum.reset();
- offsetInBlock += len;
-
- // First write to remote node before writing locally.
- if (mirrorOut != null) {
- try {
- mirrorOut.writeInt(len);
- mirrorOut.write(checksumBuf, checksumOff, checksumSize);
- mirrorOut.write(buf, 0, len);
- } catch (IOException ioe) {
- handleMirrorOutError(ioe);
+ /**
+ * Makes sure buf.position() is zero without modifying buf.remaining().
+ * It moves the data if position needs to be changed.
+ */
+ private void shiftBufData() {
+ if (bufRead != buf.limit()) {
+ throw new IllegalStateException("bufRead should be same as " +
+ "buf.limit()");
+ }
+
+ //shift the remaining data on buf to the front
+ if (buf.position() > 0) {
+ int dataLeft = buf.remaining();
+ if (dataLeft > 0) {
+ byte[] b = buf.array();
+ System.arraycopy(b, buf.position(), b, 0, dataLeft);
+ }
+ buf.position(0);
+ bufRead = dataLeft;
+ buf.limit(bufRead);
+ }
+ }
+
+ /**
+ * reads upto toRead byte to buf at buf.limit() and increments the limit.
+ * throws an IOException if read does not succeed.
+ */
+ private int readToBuf(int toRead) throws IOException {
+ if (toRead < 0) {
+ toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
+ - buf.limit();
+ }
+
+ int nRead = in.read(buf.array(), buf.limit(), toRead);
+
+ if (nRead < 0) {
+ throw new EOFException("while trying to read " + toRead + " bytes");
+ }
+ bufRead = buf.limit() + nRead;
+ buf.limit(bufRead);
+ return nRead;
+ }
+
+
+ /**
+ * Reads (at least) one packet and returns the packet length.
+ * buf.position() points to the start of the packet and
+ * buf.limit() point to the end of the packet. There could
+ * be more data from next packet in buf.<br><br>
+ *
+ * It tries to read a full packet with single read call.
+ * Consecutinve packets are usually of the same length.
+ */
+ private int readNextPacket() throws IOException {
+ /* This dances around buf a little bit, mainly to read
+ * full packet with single read and to accept arbitarary size
+ * for next packet at the same time.
+ */
+ if (buf == null) {
+ /* initialize buffer to the best guess size:
+ * 'chunksPerPacket' calculation here should match the same
+ * calculation in DFSClient to make the guess accurate.
+ */
+ int chunkSize = bytesPerChecksum + checksumSize;
+ int chunksPerPacket = (writePacketSize - PKT_HEADER_LEN -
+ SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
+ buf = ByteBuffer.allocate(PKT_HEADER_LEN + SIZE_OF_INTEGER +
+ Math.max(chunksPerPacket, 1) * chunkSize);
+ buf.limit(0);
+ }
+
+ // See if there is data left in the buffer :
+ if (bufRead > buf.limit()) {
+ buf.limit(bufRead);
+ }
+
+ while (buf.remaining() < SIZE_OF_INTEGER) {
+ if (buf.position() > 0) {
+ shiftBufData();
}
+ readToBuf(-1);
}
-
- try {
- if (!finalized) {
- out.write(buf, 0, len);
- // Write checksum
- checksumOut.write(checksumBuf, checksumOff, checksumSize);
- myMetrics.bytesWritten.inc(len);
+
+ /* We mostly have the full packet or at least enough for an int
+ */
+ buf.mark();
+ int payloadLen = buf.getInt();
+ buf.reset();
+
+ if (payloadLen == 0) {
+ //end of stream!
+ buf.limit(buf.position() + SIZE_OF_INTEGER);
+ return 0;
+ }
+
+ // check corrupt values for pktLen, 100MB upper limit should be ok?
+ if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
+ throw new IOException("Incorrect value for packet payload : " +
+ payloadLen);
+ }
+
+ int pktSize = payloadLen + PKT_HEADER_LEN;
+
+ if (buf.remaining() < pktSize) {
+ //we need to read more data
+ int toRead = pktSize - buf.remaining();
+
+ // first make sure buf has enough space.
+ int spaceLeft = buf.capacity() - buf.limit();
+ if (toRead > spaceLeft && buf.position() > 0) {
+ shiftBufData();
+ spaceLeft = buf.capacity() - buf.limit();
+ }
+ if (toRead > spaceLeft) {
+ byte oldBuf[] = buf.array();
+ int toCopy = buf.limit();
+ buf = ByteBuffer.allocate(toCopy + toRead);
+ System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
+ buf.limit(toCopy);
+ }
+
+ //now read:
+ while (toRead > 0) {
+ toRead -= readToBuf(toRead);
}
- } catch (IOException iex) {
- checkDiskError(iex);
- throw iex;
}
-
- if (throttler != null) { // throttle I/O
- throttler.throttle(len + checksumSize + 4);
+
+ if (buf.remaining() > pktSize) {
+ buf.limit(buf.position() + pktSize);
+ }
+
+ if (pktSize > maxPacketReadLen) {
+ maxPacketReadLen = pktSize;
}
+
+ return payloadLen;
}
-
- /*
- * Receive and process a packet. It contains many chunks.
+
+ /**
+ * Receives and processes a packet. It can contain many chunks.
+ * returns size of the packet.
*/
- private void receivePacket(int packetSize) throws IOException {
- /* TEMP: Currently this handles both interleaved
- * and non-interleaved DATA_CHUNKs in side the packet.
- * non-interleaved is required for HADOOP-2758 and in future.
- * iterleaved will be removed once extra buffer copies are removed
- * in write path (HADOOP-1702).
- *
- * Format of Non-interleaved data packets is described in the
- * comment before BlockSender.
- */
- offsetInBlock = in.readLong(); // get offset of packet in block
- long seqno = in.readLong(); // get seqno
- boolean lastPacketInBlock = in.readBoolean();
- int curPacketSize = 0;
- LOG.debug("Receiving one packet for block " + block +
- " of size " + packetSize +
- " seqno " + seqno +
- " offsetInBlock " + offsetInBlock +
- " lastPacketInBlock " + lastPacketInBlock);
+ private int receivePacket() throws IOException {
+
+ int payloadLen = readNextPacket();
+
+ if (payloadLen <= 0) {
+ return payloadLen;
+ }
+
+ buf.mark();
+ //read the header
+ buf.getInt(); // packet length
+ offsetInBlock = buf.getLong(); // get offset of packet in block
+ long seqno = buf.getLong(); // get seqno
+ boolean lastPacketInBlock = (buf.get() != 0);
+
+ int endOfHeader = buf.position();
+ buf.reset();
+
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Receiving one packet for block " + block +
+ " of length " + payloadLen +
+ " seqno " + seqno +
+ " offsetInBlock " + offsetInBlock +
+ " lastPacketInBlock " + lastPacketInBlock);
+ }
+
setBlockPosition(offsetInBlock);
- int len = in.readInt();
- curPacketSize += 4; // read an integer in previous line
-
- // send packet header to next datanode in pipeline
+ //First write the packet to the mirror:
if (mirrorOut != null) {
try {
- int mirrorPacketSize = packetSize;
- if (len > bytesPerChecksum) {
- /*
- * This is a packet with non-interleaved checksum.
- * But we are sending interleaving checksums to mirror,
- * which changes packet len. Adjust the packet size for mirror.
- *
- * As mentioned above, this is mismatch is
- * temporary till HADOOP-1702.
- */
-
- //find out how many chunks are in this patcket :
- int chunksInPkt = (len + bytesPerChecksum - 1)/bytesPerChecksum;
-
- // we send 4 more bytes for for each of the extra
- // checksum chunks. so :
- mirrorPacketSize += (chunksInPkt - 1) * 4;
- }
- mirrorOut.writeInt(mirrorPacketSize);
- mirrorOut.writeLong(offsetInBlock);
- mirrorOut.writeLong(seqno);
- mirrorOut.writeBoolean(lastPacketInBlock);
+ mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+ mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
}
}
+ buf.position(endOfHeader);
+ int len = buf.getInt();
+
+ if (len < 0) {
+ throw new IOException("Got wrong length during writeBlock(" + block +
+ ") from " + inAddr + " at offset " +
+ offsetInBlock + ": " + len);
+ }
+
if (len == 0) {
- LOG.info("Receiving empty packet for block " + block);
- if (mirrorOut != null) {
- try {
- mirrorOut.writeInt(len);
- mirrorOut.flush();
- } catch (IOException e) {
- handleMirrorOutError(e);
- }
- }
- }
+ LOG.debug("Receiving empty packet for block " + block);
+ } else {
+ offsetInBlock += len;
- while (len != 0) {
- int checksumOff = 0;
- if (len > 0) {
- int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum*
- checksumSize;
- if (checksumBuf.length < checksumLen) {
- checksumBuf = new byte[checksumLen];
- }
- // read the checksum
- in.readFully(checksumBuf, 0, checksumLen);
- }
-
- while (len != 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Receiving one chunk for block " + block +
- " of size " + len);
- }
-
- int toRecv = Math.min(len, bytesPerChecksum);
-
- curPacketSize += (toRecv + checksumSize);
- if (curPacketSize > packetSize) {
- throw new IOException("Packet size for block " + block +
- " too long " + curPacketSize +
- " was expecting " + packetSize);
- }
-
- receiveChunk(toRecv, checksumBuf, checksumOff);
-
- len -= toRecv;
- checksumOff += checksumSize;
+ int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
+ checksumSize;
+
+ if ( buf.remaining() != (checksumLen + len)) {
+ throw new IOException("Data remaining in packet does not match " +
+ "sum of checksumLen and dataLen");
}
-
- if (curPacketSize == packetSize) {
- if (mirrorOut != null) {
- try {
- mirrorOut.flush();
- } catch (IOException e) {
- handleMirrorOutError(e);
- }
- }
- break;
+ int checksumOff = buf.position();
+ int dataOff = checksumOff + checksumLen;
+ byte pktBuf[] = buf.array();
+
+ buf.position(buf.limit()); // move to the end of the data.
+
+ verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+
+ try {
+ if (!finalized) {
+ //finally write to the disk :
+ out.write(pktBuf, dataOff, len);
+ checksumOut.write(pktBuf, checksumOff, checksumLen);
+ myMetrics.bytesWritten.inc(len);
+ }
+ } catch (IOException iex) {
+ checkDiskError(iex);
+ throw iex;
}
- len = in.readInt();
- curPacketSize += 4;
}
/// flush entire packet before sending ack
@@ -2529,6 +2570,12 @@
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock);
}
+
+ if (throttler != null) { // throttle I/O
+ throttler.throttle(payloadLen);
+ }
+
+ return payloadLen;
}
public void writeChecksumHeader(DataOutputStream mirrorOut) throws
IOException {
@@ -2562,13 +2609,9 @@
}
/*
- * Skim packet headers. A response is needed for every packet.
+ * Receive until packet length is zero.
*/
- int len = in.readInt(); // get packet size
- while (len != 0) {
- receivePacket(len);
- len = in.readInt(); // get packet size
- }
+ while (receivePacket() > 0) {}
// flush the mirror out
if (mirrorOut != null) {
@@ -2637,8 +2680,9 @@
}
return;
}
- if (bufStream.samePosition(data, streams, block, offsetInBlock)) {
- return;
+
+ if (data.getChannelPosition(block, streams) == offsetInBlock) {
+ return; // nothing to do
}
if (offsetInBlock % bytesPerChecksum != 0) {
throw new IOException("setBlockPosition trying to set position to " +
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=656118&r1=656117&r2=656118&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue May
13 23:32:42 2008
@@ -101,12 +101,11 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 9:
- * While reading data from Datanode, each PACKET can consist
- * of non-interleaved data (check for for larger amount of data,
- * followed by data).
+ * Version 10:
+ * DFSClient also sends non-interleaved checksum and data while writing
+ * to DFS.
*/
- public static final int DATA_TRANSFER_VERSION = 9;
+ public static final int DATA_TRANSFER_VERSION = 10;
// Return codes for file create
public static final int OPERATION_FAILED = 0;
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=656118&r1=656117&r2=656118&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
Tue May 13 23:32:42 2008
@@ -204,7 +204,7 @@
sendOut.writeInt(0);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt((int)512);
- sendOut.writeInt(20); // size of packet
+ sendOut.writeInt(4); // size of packet
sendOut.writeLong(0); // OffsetInBlock
sendOut.writeLong(100); // sequencenumber
sendOut.writeBoolean(false); // lastPacketInBlock
@@ -229,7 +229,7 @@
sendOut.writeInt(0);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt((int)512); // checksum size
- sendOut.writeInt(20); // size of packet
+ sendOut.writeInt(8); // size of packet
sendOut.writeLong(0); // OffsetInBlock
sendOut.writeLong(100); // sequencenumber
sendOut.writeBoolean(true); // lastPacketInBlock